You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/19 01:19:51 UTC
svn commit: r1494369 [1/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/...
Author: vinodkv
Date: Tue Jun 18 23:19:49 2013
New Revision: 1494369
URL: http://svn.apache.org/r1494369
Log:
YARN-694. Starting to use NMTokens to authenticate all communication with NodeManagers. Contributed by Omkar Vinit Joshi.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jun 18 23:19:49 2013
@@ -199,6 +199,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-850. Rename getClusterAvailableResources to getAvailableResources in
AMRMClients (Jian He via bikas)
+ YARN-694. Starting to use NMTokens to authenticate all communication with
+ NodeManagers. (Omkar Vinit Joshi via vinodkv)
+
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Jun 18 23:19:49 2013
@@ -728,6 +728,23 @@ public class YarnConfiguration extends C
YARN_PREFIX + "client.nodemanager-client-async.thread-pool-max-size";
public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
+ /**
+ * Maximum number of proxy connections for node manager. It should always be
+ * more than 1. NMClient and MRAppMaster will use this to cache connection
+ * with node manager. There will be at max one connection per node manager.
+ * Ex. configuring it to a value of 5 will make sure that client will at
+ * max have 5 connections cached with 5 different node managers. These
+ * connections will be timed out if idle for more than system wide idle
+ * timeout period. The token if used for authentication then it will be used
+ * only at connection creation time. If new token is received then earlier
+ * connection should be closed in order to use newer token.
+ * Note: {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}
+ * are related to each other.
+ */
+ public static final String NM_CLIENT_MAX_NM_PROXIES =
+ YARN_PREFIX + "client.max-nodemanagers-proxies";
+ public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
+
public YarnConfiguration() {
super();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Jun 18 23:19:49 2013
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -447,7 +448,8 @@ public class ApplicationMaster {
resourceManager.start();
containerListener = new NMCallbackHandler();
- nmClientAsync = NMClientAsync.createNMClientAsync(containerListener);
+ nmClientAsync =
+ new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens());
nmClientAsync.init(conf);
nmClientAsync.start();
@@ -683,8 +685,7 @@ public class ApplicationMaster {
}
Container container = containers.get(containerId);
if (container != null) {
- nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(),
- container.getContainerToken());
+ nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Tue Jun 18 23:19:49 2013
@@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.net.URL;
import junit.framework.Assert;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,8 @@ import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.junit.AfterClass;
@@ -50,7 +53,7 @@ public class TestDistributedShell {
protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class);
@BeforeClass
- public static void setup() throws InterruptedException, IOException {
+ public static void setup() throws InterruptedException, Exception {
LOG.info("Starting up YARN cluster");
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
@@ -60,6 +63,9 @@ public class TestDistributedShell {
TestDistributedShell.class.getSimpleName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
+ NodeManager nm = yarnCluster.getNodeManager(0);
+ waitForNMToRegister(nm);
+
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
@@ -195,5 +201,14 @@ public class TestDistributedShell {
}
}
+ protected static void waitForNMToRegister(NodeManager nm)
+ throws Exception {
+ int attempt = 60;
+ ContainerManagerImpl cm =
+ ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
+ while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) {
+ Thread.sleep(2000);
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java Tue Jun 18 23:19:49 2013
@@ -1,4 +1,5 @@
/**
+
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,16 +22,19 @@ package org.apache.hadoop.yarn.client.ap
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
@@ -42,19 +46,30 @@ public abstract class NMClient extends A
/**
* Create a new instance of NMClient.
+ * @param nmTokens need to pass map of NMTokens which are received on
+ * {@link AMRMClient#allocate(float)} call as a part of
+ * {@link AllocateResponse}.
+ * key :- NodeAddr (host:port)
+ * Value :- Token {@link NMToken#getToken()}
*/
@Public
- public static NMClient createNMClient() {
- NMClient client = new NMClientImpl();
+ public static NMClient createNMClient(ConcurrentMap<String, Token> nmTokens) {
+ NMClient client = new NMClientImpl(nmTokens);
return client;
}
/**
* Create a new instance of NMClient.
+ * @param nmTokens need to pass map of NMTokens which are received on
+ * {@link AMRMClient#allocate(float)} call as a part of
+ * {@link AllocateResponse}.
+ * key :- NodeAddr (host:port)
+ * Value :- Token {@link NMToken#getToken()}
*/
@Public
- public static NMClient createNMClient(String name) {
- NMClient client = new NMClientImpl(name);
+ public static NMClient createNMClient(String name,
+ ConcurrentMap<String, Token> nmTokens) {
+ NMClient client = new NMClientImpl(name, nmTokens);
return client;
}
@@ -89,35 +104,33 @@ public abstract class NMClient extends A
*
* @param containerId the Id of the started container
* @param nodeId the Id of the <code>NodeManager</code>
- * @param containerToken the security token to verify authenticity of the
- * started container
+ *
* @throws YarnException
* @throws IOException
*/
- public abstract void stopContainer(ContainerId containerId, NodeId nodeId,
- Token containerToken) throws YarnException, IOException;
+ public abstract void stopContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException;
/**
* <p>Query the status of a container.</p>
*
* @param containerId the Id of the started container
* @param nodeId the Id of the <code>NodeManager</code>
- * @param containerToken the security token to verify authenticity of the
- * started container
+ *
* @return the status of a container
* @throws YarnException
* @throws IOException
*/
- public abstract ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
- Token containerToken) throws YarnException, IOException;
+ public abstract ContainerStatus getContainerStatus(ContainerId containerId,
+ NodeId nodeId) throws YarnException, IOException;
/**
* <p>Set whether the containers that are started by this client, and are
* still running should be stopped when the client stops. By default, the
- * feature should be enabled.</p>
+ * feature should be enabled.</p> However, containers will be stopped only
+ * when service is stopped. i.e. after {@link NMClient#stop()}.
*
* @param enabled whether the feature is enabled or not
*/
public abstract void cleanupRunningContainersOnStop(boolean enabled);
-
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java Tue Jun 18 23:19:49 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.client.ap
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -110,16 +111,19 @@ public abstract class NMClientAsync exte
protected NMClient client;
protected CallbackHandler callbackHandler;
- public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) {
- return new NMClientAsyncImpl(callbackHandler);
+ public static NMClientAsync createNMClientAsync(
+ CallbackHandler callbackHandler, ConcurrentMap<String, Token> nmTokens) {
+ return new NMClientAsyncImpl(callbackHandler, nmTokens);
}
- protected NMClientAsync(CallbackHandler callbackHandler) {
- this (NMClientAsync.class.getName(), callbackHandler);
+ protected NMClientAsync(CallbackHandler callbackHandler,
+ ConcurrentMap<String, Token> nmTokens) {
+ this (NMClientAsync.class.getName(), callbackHandler, nmTokens);
}
- protected NMClientAsync(String name, CallbackHandler callbackHandler) {
- this (name, new NMClientImpl(), callbackHandler);
+ protected NMClientAsync(String name, CallbackHandler callbackHandler,
+ ConcurrentMap<String, Token> nmTokens) {
+ this (name, new NMClientImpl(nmTokens), callbackHandler);
}
@Private
@@ -135,10 +139,10 @@ public abstract class NMClientAsync exte
Container container, ContainerLaunchContext containerLaunchContext);
public abstract void stopContainerAsync(
- ContainerId containerId, NodeId nodeId, Token containerToken);
+ ContainerId containerId, NodeId nodeId);
public abstract void getContainerStatusAsync(
- ContainerId containerId, NodeId nodeId, Token containerToken);
+ ContainerId containerId, NodeId nodeId);
public NMClient getClient() {
return client;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java Tue Jun 18 23:19:49 2013
@@ -82,12 +82,14 @@ public class NMClientAsyncImpl extends N
protected ConcurrentMap<ContainerId, StatefulContainer> containers =
new ConcurrentHashMap<ContainerId, StatefulContainer>();
- public NMClientAsyncImpl(CallbackHandler callbackHandler) {
- this (NMClientAsyncImpl.class.getName(), callbackHandler);
+ public NMClientAsyncImpl(CallbackHandler callbackHandler,
+ ConcurrentMap<String, Token> nmTokens) {
+ this(NMClientAsync.class.getName(), callbackHandler, nmTokens);
}
- public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
- this (name, new NMClientImpl(), callbackHandler);
+ public NMClientAsyncImpl(String name, CallbackHandler callbackHandler,
+ ConcurrentMap<String, Token> nmTokens) {
+ this(name, new NMClientImpl(nmTokens), callbackHandler);
}
@Private
@@ -229,15 +231,14 @@ public class NMClientAsyncImpl extends N
}
}
- public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
- Token containerToken) {
+ public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId,
RPCUtil.getRemoteException("Container " + containerId +
" is neither started nor scheduled to start"));
}
try {
- events.put(new ContainerEvent(containerId, nodeId, containerToken,
+ events.put(new ContainerEvent(containerId, nodeId, null,
ContainerEventType.STOP_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of stopping Container " +
@@ -246,10 +247,9 @@ public class NMClientAsyncImpl extends N
}
}
- public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
- Token containerToken) {
+ public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
try {
- events.put(new ContainerEvent(containerId, nodeId, containerToken,
+ events.put(new ContainerEvent(containerId, nodeId, null,
ContainerEventType.QUERY_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of querying the status" +
@@ -421,9 +421,9 @@ public class NMClientAsyncImpl extends N
StatefulContainer container, ContainerEvent event) {
ContainerId containerId = event.getContainerId();
try {
- container.nmClientAsync.getClient().stopContainer(
- containerId, event.getNodeId(), event.getContainerToken());
- try {
+ container.nmClientAsync.getClient().stopContainer(
+ containerId, event.getNodeId());
+ try {
container.nmClientAsync.getCallbackHandler().onContainerStopped(
event.getContainerId());
} catch (Throwable thr) {
@@ -534,7 +534,7 @@ public class NMClientAsyncImpl extends N
if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
try {
ContainerStatus containerStatus = client.getContainerStatus(
- containerId, event.getNodeId(), event.getContainerToken());
+ containerId, event.getNodeId());
try {
callbackHandler.onContainerStatusReceived(
containerId, containerStatus);
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java?rev=1494369&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java Tue Jun 18 23:19:49 2013
@@ -0,0 +1,237 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/**
+ * Helper class to manage container manager proxies
+ */
+@LimitedPrivate({ "MapReduce", "YARN" })
+public class ContainerManagementProtocolProxy {
+ static final Log LOG = LogFactory.getLog(ContainerManagementProtocolProxy.class);
+
+ private final int maxConnectedNMs;
+ private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
+ private Map<String, Token> nmTokens;
+ private final Configuration conf;
+ private final YarnRPC rpc;
+
+ public ContainerManagementProtocolProxy(Configuration conf,
+ Map<String, Token> nmTokens) {
+ this.nmTokens = nmTokens;
+ this.conf = conf;
+
+ maxConnectedNMs =
+ conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES,
+ YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXIES);
+ if (maxConnectedNMs < 1) {
+ throw new YarnRuntimeException(
+ YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES
+ + " (" + maxConnectedNMs + ") can not be less than 1.");
+ }
+ LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES + " : "
+ + maxConnectedNMs);
+
+ cmProxy =
+ new LinkedHashMap<String, ContainerManagementProtocolProxyData>();
+ rpc = YarnRPC.create(conf);
+ }
+
+ public synchronized ContainerManagementProtocolProxyData getProxy(
+ String containerManagerBindAddr, ContainerId containerId)
+ throws InvalidToken {
+
+ // This get call will update the map which is working as LRU cache.
+ ContainerManagementProtocolProxyData proxy =
+ cmProxy.get(containerManagerBindAddr);
+
+ while (proxy != null
+ && !proxy.token.getIdentifier().equals(
+ nmTokens.get(containerManagerBindAddr).getIdentifier())) {
+ LOG.info("Refreshing proxy as NMToken got updated for node : "
+ + containerManagerBindAddr);
+ // Token is updated. check if anyone has already tried closing it.
+ if (!proxy.scheduledForClose) {
+ // try closing the proxy. Here if someone is already using it
+ // then we might not close it. In which case we will wait.
+ removeProxy(proxy);
+ } else {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (proxy.activeCallers < 0) {
+ proxy = cmProxy.get(containerManagerBindAddr);
+ }
+ }
+
+ if (proxy == null) {
+ proxy =
+ new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
+ containerId, nmTokens.get(containerManagerBindAddr));
+ if (cmProxy.size() > maxConnectedNMs) {
+ // Number of existing proxy exceed the limit.
+ String cmAddr = cmProxy.keySet().iterator().next();
+ removeProxy(cmProxy.get(cmAddr));
+ }
+
+ cmProxy.put(containerManagerBindAddr, proxy);
+ }
+ // This is to track active users of this proxy.
+ proxy.activeCallers++;
+ updateLRUCache(containerManagerBindAddr);
+
+ return proxy;
+ }
+
+ private void updateLRUCache(String containerManagerBindAddr) {
+ ContainerManagementProtocolProxyData proxy =
+ cmProxy.remove(containerManagerBindAddr);
+ cmProxy.put(containerManagerBindAddr, proxy);
+ }
+
+ public synchronized void mayBeCloseProxy(
+ ContainerManagementProtocolProxyData proxy) {
+ proxy.activeCallers--;
+ if (proxy.scheduledForClose && proxy.activeCallers < 0) {
+ LOG.info("Closing proxy : " + proxy.containerManagerBindAddr);
+ cmProxy.remove(proxy.containerManagerBindAddr);
+ try {
+ rpc.stopProxy(proxy.getContainerManagementProtocol(), conf);
+ } finally {
+ this.notifyAll();
+ }
+ }
+ }
+
+ private synchronized void removeProxy(
+ ContainerManagementProtocolProxyData proxy) {
+ if (!proxy.scheduledForClose) {
+ proxy.scheduledForClose = true;
+ mayBeCloseProxy(proxy);
+ }
+ }
+
+ public synchronized void stopAllProxies() {
+ List<String> nodeIds = new ArrayList<String>();
+ nodeIds.addAll(this.cmProxy.keySet());
+ for (String nodeId : nodeIds) {
+ ContainerManagementProtocolProxyData proxy = cmProxy.get(nodeId);
+ // Explicitly reducing the proxy count to allow stopping proxy.
+ proxy.activeCallers = 0;
+ try {
+ removeProxy(proxy);
+ } catch (Throwable t) {
+ LOG.error("Error closing connection", t);
+ }
+ }
+ cmProxy.clear();
+ }
+
+ public synchronized void setNMTokens(Map<String, Token> nmTokens) {
+ this.nmTokens = nmTokens;
+ }
+
+ public class ContainerManagementProtocolProxyData {
+ private final String containerManagerBindAddr;
+ private final ContainerManagementProtocol proxy;
+ private int activeCallers;
+ private boolean scheduledForClose;
+ private final Token token;
+
+ @Private
+ @VisibleForTesting
+ public ContainerManagementProtocolProxyData(YarnRPC rpc,
+ String containerManagerBindAddr,
+ ContainerId containerId, Token token) throws InvalidToken {
+ this.containerManagerBindAddr = containerManagerBindAddr;
+ ;
+ this.activeCallers = 0;
+ this.scheduledForClose = false;
+ this.token = token;
+ this.proxy = newProxy(rpc, containerManagerBindAddr, containerId, token);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
+ String containerManagerBindAddr, ContainerId containerId, Token token)
+ throws InvalidToken {
+ if (token == null) {
+ throw new InvalidToken("No NMToken sent for "
+ + containerManagerBindAddr);
+ }
+ final InetSocketAddress cmAddr =
+ NetUtils.createSocketAddr(containerManagerBindAddr);
+ LOG.info("Opening proxy : " + containerManagerBindAddr);
+ // the user in createRemoteUser in this context has to be ContainerID
+ UserGroupInformation user =
+ UserGroupInformation.createRemoteUser(containerId
+ .getApplicationAttemptId().toString());
+
+ org.apache.hadoop.security.token.Token<NMTokenIdentifier> nmToken =
+ ConverterUtils.convertFromYarn(token, cmAddr);
+ user.addToken(nmToken);
+
+ ContainerManagementProtocol proxy = user
+ .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+
+ @Override
+ public ContainerManagementProtocol run() {
+ return (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class, cmAddr, conf);
+ }
+ });
+ return proxy;
+ }
+
+ public ContainerManagementProtocol getContainerManagementProtocol() {
+ return proxy;
+ }
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java Tue Jun 18 23:19:49 2013
@@ -19,9 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -29,31 +27,23 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
/**
* <p>
@@ -91,14 +81,18 @@ public class NMClientImpl extends NMClie
new ConcurrentHashMap<ContainerId, StartedContainer>();
//enabled by default
- private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private ContainerManagementProtocolProxy cmProxy;
+ private ConcurrentMap<String, Token> nmTokens;
- public NMClientImpl() {
+ public NMClientImpl(ConcurrentMap<String, Token> nmTokens) {
super(NMClientImpl.class.getName());
+ this.nmTokens = nmTokens;
}
- public NMClientImpl(String name) {
+ public NMClientImpl(String name, ConcurrentMap<String, Token> nmTokens) {
super(name);
+ this.nmTokens = nmTokens;
}
@Override
@@ -108,6 +102,7 @@ public class NMClientImpl extends NMClie
if (getCleanupRunningContainers().get()) {
cleanupRunningContainers();
}
+ cmProxy.stopAllProxies();
super.serviceStop();
}
@@ -115,8 +110,7 @@ public class NMClientImpl extends NMClie
for (StartedContainer startedContainer : startedContainers.values()) {
try {
stopContainer(startedContainer.getContainerId(),
- startedContainer.getNodeId(),
- startedContainer.getContainerToken());
+ startedContainer.getNodeId());
} catch (YarnException e) {
LOG.error("Failed to stop Container " +
startedContainer.getContainerId() +
@@ -130,22 +124,28 @@ public class NMClientImpl extends NMClie
}
@Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ cmProxy =
+ new ContainerManagementProtocolProxy(conf, nmTokens);
+ }
+
+ @Override
public void cleanupRunningContainersOnStop(boolean enabled) {
getCleanupRunningContainers().set(enabled);
}
-
+
protected static class StartedContainer {
private ContainerId containerId;
private NodeId nodeId;
- private Token containerToken;
- private boolean stopped;
-
+ private ContainerState state;
+
+
public StartedContainer(ContainerId containerId, NodeId nodeId,
Token containerToken) {
this.containerId = containerId;
this.nodeId = nodeId;
- this.containerToken = containerToken;
- stopped = false;
+ state = ContainerState.NEW;
}
public ContainerId getContainerId() {
@@ -155,137 +155,17 @@ public class NMClientImpl extends NMClie
public NodeId getNodeId() {
return nodeId;
}
-
- public Token getContainerToken() {
- return containerToken;
- }
}
- protected static final class NMCommunicator extends AbstractService {
- private ContainerId containerId;
- private NodeId nodeId;
- private Token containerToken;
- private ContainerManagementProtocol containerManager;
-
- public NMCommunicator(ContainerId containerId, NodeId nodeId,
- Token containerToken) {
- super(NMCommunicator.class.getName());
- this.containerId = containerId;
- this.nodeId = nodeId;
- this.containerToken = containerToken;
- }
-
- @Override
- protected synchronized void serviceStart() throws Exception {
- final YarnRPC rpc = YarnRPC.create(getConfig());
-
- final InetSocketAddress containerAddress =
- NetUtils.createSocketAddr(nodeId.toString());
-
- // the user in createRemoteUser in this context has to be ContainerId
- UserGroupInformation currentUser =
- UserGroupInformation.createRemoteUser(containerId.toString());
-
- org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
- ConverterUtils.convertFromYarn(containerToken, containerAddress);
- currentUser.addToken(token);
-
- containerManager = currentUser
- .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
- containerAddress, getConfig());
- }
- });
-
- LOG.debug("Connecting to ContainerManager at " + containerAddress);
- super.serviceStart();
- }
-
- @Override
- protected synchronized void serviceStop() throws Exception {
- if (this.containerManager != null) {
- RPC.stopProxy(this.containerManager);
-
- if (LOG.isDebugEnabled()) {
- InetSocketAddress containerAddress =
- NetUtils.createSocketAddr(nodeId.toString());
- LOG.debug("Disconnecting from ContainerManager at " +
- containerAddress);
- }
- }
- super.serviceStop();
- }
-
- public synchronized Map<String, ByteBuffer> startContainer(
- Container container, ContainerLaunchContext containerLaunchContext)
- throws YarnException, IOException {
- if (!container.getId().equals(containerId)) {
- throw new IllegalArgumentException(
- "NMCommunicator's containerId mismatches the given Container's");
- }
- StartContainerResponse startResponse = null;
- try {
- StartContainerRequest startRequest =
- Records.newRecord(StartContainerRequest.class);
- startRequest.setContainerToken(container.getContainerToken());
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startResponse = containerManager.startContainer(startRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Started Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn("Container " + containerId + " failed to start", e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Container " + containerId + " failed to start", e);
- throw e;
- }
- return startResponse.getAllServicesMetaData();
- }
-
- public synchronized void stopContainer() throws YarnException,
- IOException {
- try {
- StopContainerRequest stopRequest =
- Records.newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(containerId);
- containerManager.stopContainer(stopRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopped Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn("Container " + containerId + " failed to stop", e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Container " + containerId + " failed to stop", e);
- throw e;
- }
- }
-
- public synchronized ContainerStatus getContainerStatus()
- throws YarnException, IOException {
- GetContainerStatusResponse statusResponse = null;
- try {
- GetContainerStatusRequest statusRequest =
- Records.newRecord(GetContainerStatusRequest.class);
- statusRequest.setContainerId(containerId);
- statusResponse = containerManager.getContainerStatus(statusRequest);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got the status of Container " + containerId);
- }
- } catch (YarnException e) {
- LOG.warn(
- "Unable to get the status of Container " + containerId, e);
- throw e;
- } catch (IOException e) {
- LOG.warn(
- "Unable to get the status of Container " + containerId, e);
- throw e;
- }
- return statusResponse.getStatus();
+ private void addStartingContainer(StartedContainer startedContainer)
+ throws YarnException {
+ if (startedContainers.putIfAbsent(startedContainer.containerId,
+ startedContainer) != null) {
+ throw RPCUtil.getRemoteException("Container "
+ + startedContainer.containerId.toString() + " is already started");
}
+ startedContainers
+ .put(startedContainer.getContainerId(), startedContainer);
}
@Override
@@ -293,108 +173,112 @@ public class NMClientImpl extends NMClie
Container container, ContainerLaunchContext containerLaunchContext)
throws YarnException, IOException {
// Do synchronization on StartedContainer to prevent race condition
- // between startContainer and stopContainer
- synchronized (addStartedContainer(container)) {
+ // between startContainer and stopContainer only when startContainer is
+ // in progress for a given container.
+ StartedContainer startingContainer = createStartedContainer(container);
+ synchronized (startingContainer) {
+ addStartingContainer(startingContainer);
+
Map<String, ByteBuffer> allServiceResponse;
- NMCommunicator nmCommunicator = null;
+ ContainerManagementProtocolProxyData proxy = null;
try {
- nmCommunicator = new NMCommunicator(container.getId(),
- container.getNodeId(), container.getContainerToken());
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
+ proxy =
+ cmProxy.getProxy(container.getNodeId().toString(),
+ container.getId());
allServiceResponse =
- nmCommunicator.startContainer(container, containerLaunchContext);
+ proxy
+ .getContainerManagementProtocol().startContainer(
+ StartContainerRequest.newInstance(containerLaunchContext,
+ container.getContainerToken())).getAllServicesMetaData();
+ startingContainer.state = ContainerState.RUNNING;
} catch (YarnException e) {
+ startingContainer.state = ContainerState.COMPLETE;
// Remove the started container if it failed to start
- removeStartedContainer(container.getId());
+ removeStartedContainer(startingContainer);
throw e;
} catch (IOException e) {
- removeStartedContainer(container.getId());
+ startingContainer.state = ContainerState.COMPLETE;
+ removeStartedContainer(startingContainer);
throw e;
} catch (Throwable t) {
- removeStartedContainer(container.getId());
+ startingContainer.state = ContainerState.COMPLETE;
+ removeStartedContainer(startingContainer);
throw RPCUtil.getRemoteException(t);
} finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
}
}
return allServiceResponse;
}
-
- // Three choices:
- // 1. starting and releasing the proxy before and after each interaction
- // 2. starting the proxy when starting the container and releasing it when
- // stopping the container
- // 3. starting the proxy when starting the container and releasing it when
- // stopping the client
- // Adopt 1 currently
}
@Override
- public void stopContainer(ContainerId containerId, NodeId nodeId,
- Token containerToken) throws YarnException, IOException {
+ public void stopContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException {
StartedContainer startedContainer = getStartedContainer(containerId);
- if (startedContainer == null) {
- throw RPCUtil.getRemoteException("Container " + containerId +
- " is either not started yet or already stopped");
- }
+
// Only allow one request of stopping the container to move forward
// When entering the block, check whether the precursor has already stopped
// the container
- synchronized (startedContainer) {
- if (startedContainer.stopped) {
- return;
- }
- NMCommunicator nmCommunicator = null;
- try {
- nmCommunicator =
- new NMCommunicator(containerId, nodeId, containerToken);
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
- nmCommunicator.stopContainer();
- } finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (startedContainer != null) {
+ synchronized (startedContainer) {
+ if (startedContainer.state != ContainerState.RUNNING) {
+ return;
}
- startedContainer.stopped = true;
- removeStartedContainer(containerId);
+ stopContainerInternal(containerId, nodeId);
+ // Only after successful
+ startedContainer.state = ContainerState.COMPLETE;
+ removeStartedContainer(startedContainer);
}
+ } else {
+ stopContainerInternal(containerId, nodeId);
}
+
}
@Override
public ContainerStatus getContainerStatus(ContainerId containerId,
- NodeId nodeId, Token containerToken)
- throws YarnException, IOException {
- NMCommunicator nmCommunicator = null;
+ NodeId nodeId) throws YarnException, IOException {
+
+ ContainerManagementProtocolProxyData proxy = null;
try {
- nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
- nmCommunicator.init(getConfig());
- nmCommunicator.start();
- ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ ContainerStatus containerStatus =
+ proxy.getContainerManagementProtocol().getContainerStatus(
+ GetContainerStatusRequest.newInstance(containerId)).getStatus();
return containerStatus;
} finally {
- if (nmCommunicator != null) {
- nmCommunicator.stop();
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
}
}
}
- protected synchronized StartedContainer addStartedContainer(
- Container container) throws YarnException, IOException {
- if (startedContainers.containsKey(container.getId())) {
- throw RPCUtil.getRemoteException("Container " + container.getId() +
- " is already started");
+ private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
+ throws IOException, YarnException {
+ ContainerManagementProtocolProxyData proxy = null;
+ try {
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ proxy.getContainerManagementProtocol().stopContainer(
+ StopContainerRequest.newInstance(containerId));
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
}
+ }
+
+ protected synchronized StartedContainer createStartedContainer(
+ Container container) throws YarnException, IOException {
StartedContainer startedContainer = new StartedContainer(container.getId(),
container.getNodeId(), container.getContainerToken());
- startedContainers.put(startedContainer.getContainerId(), startedContainer);
return startedContainer;
}
- protected synchronized void removeStartedContainer(ContainerId containerId) {
- startedContainers.remove(containerId);
+ protected synchronized void
+ removeStartedContainer(StartedContainer container) {
+ startedContainers.remove(container.containerId);
}
protected synchronized StartedContainer getStartedContainer(
@@ -405,5 +289,4 @@ public class NMClientImpl extends NMClie
public AtomicBoolean getCleanupRunningContainers() {
return cleanupRunningContainers;
}
-
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java Tue Jun 18 23:19:49 2013
@@ -232,10 +232,10 @@ public class TestNMClientAsync {
actualStartSuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
- asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+ asyncClient.getContainerStatusAsync(containerId, nodeId);
} else {
// move on to the following failure tests
- asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+ asyncClient.stopContainerAsync(containerId, nodeId);
}
// Shouldn't crash the test thread
@@ -253,7 +253,7 @@ public class TestNMClientAsync {
actualQuerySuccess.addAndGet(1);
actualQuerySuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
- asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+ asyncClient.stopContainerAsync(containerId, nodeId);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -290,7 +290,7 @@ public class TestNMClientAsync {
actualStartFailure.addAndGet(1);
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
// move on to the following failure tests
- asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+ asyncClient.getContainerStatusAsync(containerId, nodeId);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -383,33 +383,30 @@ public class TestNMClientAsync {
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.<String, ByteBuffer>emptyMap());
- when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class))).thenReturn(
+ when(client.getContainerStatus(any(ContainerId.class),
+ any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).stopContainer(any(ContainerId.class),
- any(NodeId.class), any(Token.class));
+ any(NodeId.class));
break;
case 1:
doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
.startContainer(any(Container.class),
any(ContainerLaunchContext.class));
doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
- .getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .getContainerStatus(any(ContainerId.class), any(NodeId.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
- .stopContainer(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .stopContainer(any(ContainerId.class), any(NodeId.class));
break;
case 2:
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.<String, ByteBuffer>emptyMap());
- when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
- any(Token.class))).thenReturn(
+ when(client.getContainerStatus(any(ContainerId.class),
+ any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
- .stopContainer(any(ContainerId.class), any(NodeId.class),
- any(Token.class));
+ .stopContainer(any(ContainerId.class), any(NodeId.class));
}
return client;
}
@@ -437,8 +434,7 @@ public class TestNMClientAsync {
t.start();
barrierA.await();
- asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ asyncClient.stopContainerAsync(container.getId(), container.getNodeId());
barrierC.await();
Assert.assertFalse("Starting and stopping should be out of order",
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java Tue Jun 18 23:19:49 2013
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,10 +47,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
@@ -75,7 +78,8 @@ public class TestNMClient {
List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null;
int nodeCount = 3;
-
+ ConcurrentHashMap<String, Token> nmTokens;
+
@Before
public void setup() throws YarnException, IOException {
// start minicluster
@@ -140,6 +144,7 @@ public class TestNMClient {
if (iterationsLeft == 0) {
fail("Application hasn't bee started");
}
+ nmTokens = new ConcurrentHashMap<String, Token>();
// start am rm client
rmClient =
@@ -151,7 +156,7 @@ public class TestNMClient {
assertEquals(STATE.STARTED, rmClient.getServiceState());
// start am nm client
- nmClient = (NMClientImpl) NMClient.createNMClient();
+ nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens);
nmClient.init(conf);
nmClient.start();
assertNotNull(nmClient);
@@ -194,14 +199,13 @@ public class TestNMClient {
assertEquals(0, nmClient.startedContainers.size());
}
- @Test (timeout = 60000)
+ @Test (timeout = 200000)
public void testNMClient()
throws YarnException, IOException {
-
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-
+
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
// stop the running containers on close
@@ -243,6 +247,11 @@ public class TestNMClient {
for(Container container : allocResponse.getAllocatedContainers()) {
containers.add(container);
}
+ if (!allocResponse.getNMTokens().isEmpty()) {
+ for (NMToken token : allocResponse.getNMTokens()) {
+ nmTokens.put(token.getNodeId().toString(), token.getToken());
+ }
+ }
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
@@ -261,8 +270,7 @@ public class TestNMClient {
// getContainerStatus shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
- nmClient.getContainerStatus(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.getContainerStatus(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
@@ -272,12 +280,11 @@ public class TestNMClient {
// stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown
try {
- nmClient.stopContainer(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.stopContainer(container.getId(), container.getNodeId());
fail("Exception is expected");
} catch (YarnException e) {
if (!e.getMessage()
- .contains("is either not started yet or already stopped")) {
+ .contains("is not handled by this NodeManager")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e).initCause(
e));
@@ -306,8 +313,7 @@ public class TestNMClient {
-1000);
try {
- nmClient.stopContainer(container.getId(), container.getNodeId(),
- container.getContainerToken());
+ nmClient.stopContainer(container.getId(), container.getNodeId());
} catch (YarnException e) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
@@ -335,8 +341,7 @@ public class TestNMClient {
while (true) {
try {
ContainerStatus status = nmClient.getContainerStatus(
- container.getId(), container.getNodeId(),
- container.getContainerToken());
+ container.getId(), container.getNodeId());
// NodeManager may still need some time to get the stable
// container status
if (status.getState() == state) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java Tue Jun 18 23:19:49 2013
@@ -55,7 +55,7 @@ public class ContainerManagerSecurityInf
@Override
public Class<? extends TokenSelector<? extends TokenIdentifier>>
value() {
- return ContainerTokenSelector.class;
+ return NMTokenSelector.class;
}
};
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java Tue Jun 18 23:19:49 2013
@@ -21,21 +21,17 @@ package org.apache.hadoop.yarn.security;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
@Public
@Evolving
@@ -48,14 +44,14 @@ public class NMTokenIdentifier extends T
private ApplicationAttemptId appAttemptId;
private NodeId nodeId;
private String appSubmitter;
- private int masterKeyId;
+ private int keyId;
public NMTokenIdentifier(ApplicationAttemptId appAttemptId, NodeId nodeId,
String applicationSubmitter, int masterKeyId) {
this.appAttemptId = appAttemptId;
this.nodeId = nodeId;
this.appSubmitter = applicationSubmitter;
- this.masterKeyId = masterKeyId;
+ this.keyId = masterKeyId;
}
/**
@@ -76,8 +72,8 @@ public class NMTokenIdentifier extends T
return appSubmitter;
}
- public int getMastKeyId() {
- return masterKeyId;
+ public int getKeyId() {
+ return keyId;
}
@Override
@@ -89,7 +85,7 @@ public class NMTokenIdentifier extends T
out.writeInt(appAttemptId.getAttemptId());
out.writeUTF(this.nodeId.toString());
out.writeUTF(this.appSubmitter);
- out.writeInt(this.masterKeyId);
+ out.writeInt(this.keyId);
}
@Override
@@ -101,7 +97,7 @@ public class NMTokenIdentifier extends T
String[] hostAddr = in.readUTF().split(":");
nodeId = NodeId.newInstance(hostAddr[0], Integer.parseInt(hostAddr[1]));
appSubmitter = in.readUTF();
- masterKeyId = in.readInt();
+ keyId = in.readInt();
}
@Override
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java?rev=1494369&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java Tue Jun 18 23:19:49 2013
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.security;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class NMTokenSelector implements
+ TokenSelector<NMTokenIdentifier> {
+
+ private static final Log LOG = LogFactory
+ .getLog(NMTokenSelector.class);
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Token<NMTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Looking for service: " + service + ". Current token is "
+ + token);
+ }
+ if (NMTokenIdentifier.KIND.equals(token.getKind()) &&
+ service.equals(token.getService())) {
+ return (Token<NMTokenIdentifier>) token;
+ }
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier Tue Jun 18 23:19:49 2013
@@ -15,3 +15,4 @@ org.apache.hadoop.yarn.security.Containe
org.apache.hadoop.yarn.security.AMRMTokenIdentifier
org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier
org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier
+org.apache.hadoop.yarn.security.NMTokenIdentifier
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Tue Jun 18 23:19:49 2013
@@ -659,7 +659,25 @@
<name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
<value>500</value>
</property>
-
+
+ <property>
+ <description>
+ Maximum number of proxy connections for node manager. It should always be
+ more than 1. NMClient and MRAppMaster will use this to cache connection
+ with node manager. There will be at max one connection per node manager.
+ Ex. configuring it to a value of 5 will make sure that client will at
+ max have 5 connections cached with 5 different node managers. These
+ connections will be timed out if idle for more than system wide idle
+ timeout period. The token if used for authentication then it will be used
+ only at connection creation time. If new token is received then earlier
+ connection should be closed in order to use newer token. This and
+ (yarn.client.nodemanager-client-async.thread-pool-max-size) are related
+ and should be sync (no need for them to be equal).
+ </description>
+ <name>yarn.client.max-nodemanagers-proxies</name>
+ <value>500</value>
+ </property>
+
<!--Map Reduce configuration-->
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java Tue Jun 18 23:19:49 2013
@@ -114,4 +114,38 @@ public class BaseNMTokenSecretManager ex
public NMTokenIdentifier createIdentifier() {
return new NMTokenIdentifier();
}
+
+ /**
+ * Helper function for creating NMTokens.
+ */
+ public Token createNMToken(ApplicationAttemptId applicationAttemptId,
+ NodeId nodeId, String applicationSubmitter) {
+ byte[] password;
+ NMTokenIdentifier identifier;
+
+ this.readLock.lock();
+ try {
+ identifier =
+ new NMTokenIdentifier(applicationAttemptId, nodeId,
+ applicationSubmitter, this.currentMasterKey.getMasterKey()
+ .getKeyId());
+ password = this.createPassword(identifier);
+ } finally {
+ this.readLock.unlock();
+ }
+ return newInstance(password, identifier);
+ }
+
+ public static Token newInstance(byte[] password,
+ NMTokenIdentifier identifier) {
+ NodeId nodeId = identifier.getNodeId();
+ // RPC layer client expects ip:port as service for tokens
+ InetSocketAddress addr =
+ NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+ Token nmToken =
+ Token.newInstance(identifier.getBytes(),
+ NMTokenIdentifier.KIND.toString(), password, SecurityUtil
+ .buildTokenService(addr).toString());
+ return nmToken;
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java Tue Jun 18 23:19:49 2013
@@ -27,6 +27,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -63,6 +64,8 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Builder utilities to construct various objects.
*
@@ -152,7 +155,8 @@ public class BuilderUtils {
int port, String user, Resource r, long expiryTime, int masterKeyId,
byte[] password, long rmIdentifier) throws IOException {
ContainerTokenIdentifier identifier =
- new ContainerTokenIdentifier(cId, host, user, r, expiryTime,
+ new ContainerTokenIdentifier(cId, host + ":" + port, user, r,
+ expiryTime,
masterKeyId, rmIdentifier);
return newContainerToken(BuilderUtils.newNodeId(host, port), password,
identifier);
@@ -228,6 +232,8 @@ public class BuilderUtils {
return newToken(Token.class, identifier, kind, password, service);
}
+ @Private
+ @VisibleForTesting
public static Token newContainerToken(NodeId nodeId,
byte[] password, ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Jun 18 23:19:49 2013
@@ -435,7 +435,7 @@ public class NodeManager extends Composi
}
@VisibleForTesting
- Context getNMContext() {
+ public Context getNMContext() {
return this.context;
}