You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/25 08:05:36 UTC

git commit: YARN-2314. Disable ContainerManagementProtocolProxy cache by default to prevent creating thousands of threads in a large cluster. Contributed by Jason Lowe

Repository: hadoop
Updated Branches:
  refs/heads/trunk 5864dd99a -> f44cf9959


YARN-2314. Disable ContainerManagementProtocolProxy cache by default to prevent creating thousands of threads in a large cluster. Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f44cf995
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f44cf995
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f44cf995

Branch: refs/heads/trunk
Commit: f44cf99599119b5e989be724eeab447b2dc4fe53
Parents: 5864dd9
Author: Jian He <ji...@apache.org>
Authored: Fri Oct 24 23:05:16 2014 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Oct 24 23:05:16 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  4 +
 .../hadoop/yarn/conf/YarnConfiguration.java     | 40 ++++++----
 .../impl/ContainerManagementProtocolProxy.java  | 80 ++++++++++++++++----
 .../src/main/resources/yarn-default.xml         | 34 +++++----
 4 files changed, 115 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44cf995/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 26ca5d0..0b620ec 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -746,6 +746,10 @@ Release 2.6.0 - UNRELEASED
     to contact with AM before AM actually receives the ClientToAMTokenMasterKey.
     (Jason Lowe via jianhe)
 
+    YARN-2314. Disable ContainerManagementProtocolProxy cache by default to
+    prevent creating thousands of threads in a large cluster. (Jason Lowe via
+    jianhe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44cf995/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 59e303f..2d08fde 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -70,10 +70,18 @@ public class YarnConfiguration extends Configuration {
   public static final int APPLICATION_MAX_TAG_LENGTH = 100;
 
   static {
+    addDeprecatedKeys();
     Configuration.addDefaultResource(YARN_DEFAULT_CONFIGURATION_FILE);
     Configuration.addDefaultResource(YARN_SITE_CONFIGURATION_FILE);
   }
 
+  private static void addDeprecatedKeys() {
+    Configuration.addDeprecations(new DeprecationDelta[] {
+        new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
+            NM_CLIENT_MAX_NM_PROXIES)
+    });
+  }
+
   //Configurations
 
   public static final String YARN_PREFIX = "yarn.";
@@ -1446,21 +1454,27 @@ public class YarnConfiguration extends Configuration {
   public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
 
   /**
-   * Maximum number of proxy connections for node manager. It should always be
-   * more than 1. NMClient and MRAppMaster will use this to cache connection
-   * with node manager. There will be at max one connection per node manager.
-   * Ex. configuring it to a value of 5 will make sure that client will at
-   * max have 5 connections cached with 5 different node managers. These
-   * connections will be timed out if idle for more than system wide idle
-   * timeout period. The token if used for authentication then it will be used
-   * only at connection creation time. If new token is received then earlier
-   * connection should be closed in order to use newer token.
-   * Note: {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}
-   * are related to each other.
+   * Maximum number of proxy connections to cache for node managers. If set
+   * to a value greater than zero then the cache is enabled and the NMClient
+   * and MRAppMaster will cache the specified number of node manager proxies.
+   * There will be at max one proxy per node manager. Ex. configuring it to a
+   * value of 5 will make sure that client will at max have 5 proxies cached
+   * with 5 different node managers. These connections for these proxies will
+   * be timed out if idle for more than the system wide idle timeout period.
+   * Note that this could cause issues on large clusters as many connections
+   * could linger simultaneously and lead to a large number of connection
+   * threads. The token used for authentication will be used only at
+   * connection creation time. If a new token is received then the earlier
+   * connection should be closed in order to use the new token. This and
+   * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE} are related
+   * and should be in sync (no need for them to be equal).
+   * If the value of this property is zero then the connection cache is
+   * disabled and connections will use a zero idle timeout to prevent too
+   * many connection threads on large clusters.
    */
   public static final String NM_CLIENT_MAX_NM_PROXIES =
-      YARN_PREFIX + "client.max-nodemanagers-proxies";
-  public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
+      YARN_PREFIX + "client.max-cached-nodemanagers-proxies";
+  public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 0;
 
   /** Max time to wait to establish a connection to NM */
   public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44cf995/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
index daeae92..eaf048d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
@@ -20,14 +20,17 @@ package org.apache.hadoop.yarn.client.api.impl;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -53,7 +56,7 @@ public class ContainerManagementProtocolProxy {
   static final Log LOG = LogFactory.getLog(ContainerManagementProtocolProxy.class);
 
   private final int maxConnectedNMs;
-  private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
+  private final Map<String, ContainerManagementProtocolProxyData> cmProxy;
   private final Configuration conf;
   private final YarnRPC rpc;
   private NMTokenCache nmTokenCache;
@@ -70,16 +73,25 @@ public class ContainerManagementProtocolProxy {
     maxConnectedNMs =
         conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES,
             YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXIES);
-    if (maxConnectedNMs < 1) {
+    if (maxConnectedNMs < 0) {
       throw new YarnRuntimeException(
           YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES
-              + " (" + maxConnectedNMs + ") can not be less than 1.");
+              + " (" + maxConnectedNMs + ") can not be less than 0.");
     }
     LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES + " : "
         + maxConnectedNMs);
 
-    cmProxy =
-        new LinkedHashMap<String, ContainerManagementProtocolProxyData>();
+    if (maxConnectedNMs > 0) {
+      cmProxy =
+          new LinkedHashMap<String, ContainerManagementProtocolProxyData>();
+    } else {
+      cmProxy = Collections.emptyMap();
+      // Connections are not being cached so ensure connections close quickly
+      // to avoid creating thousands of RPC client threads on large clusters.
+      conf.setInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+          0);
+    }
     rpc = YarnRPC.create(conf);
   }
   
@@ -117,13 +129,9 @@ public class ContainerManagementProtocolProxy {
       proxy =
           new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
               containerId, nmTokenCache.getToken(containerManagerBindAddr));
-      if (cmProxy.size() > maxConnectedNMs) {
-        // Number of existing proxy exceed the limit.
-        String cmAddr = cmProxy.keySet().iterator().next();
-        removeProxy(cmProxy.get(cmAddr));
+      if (maxConnectedNMs > 0) {
+        addProxyToCache(containerManagerBindAddr, proxy);
       }
-      
-      cmProxy.put(containerManagerBindAddr, proxy);
     }
     // This is to track active users of this proxy.
     proxy.activeCallers++;
@@ -131,15 +139,52 @@ public class ContainerManagementProtocolProxy {
     
     return proxy;
   }
+
+  private void addProxyToCache(String containerManagerBindAddr,
+      ContainerManagementProtocolProxyData proxy) {
+    while (cmProxy.size() >= maxConnectedNMs) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cleaning up the proxy cache, size=" + cmProxy.size()
+            + " max=" + maxConnectedNMs);
+      }
+      boolean removedProxy = false;
+      for (ContainerManagementProtocolProxyData otherProxy : cmProxy.values()) {
+        removedProxy = removeProxy(otherProxy);
+        if (removedProxy) {
+          break;
+        }
+      }
+      if (!removedProxy) {
+        // all of the proxies are currently in use and already scheduled
+        // for removal, so we need to wait until at least one of them closes
+        try {
+          this.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    if (maxConnectedNMs > 0) {
+      cmProxy.put(containerManagerBindAddr, proxy);
+    }
+  }
   
   private void updateLRUCache(String containerManagerBindAddr) {
-    ContainerManagementProtocolProxyData proxy =
-        cmProxy.remove(containerManagerBindAddr);
-    cmProxy.put(containerManagerBindAddr, proxy);
+    if (maxConnectedNMs > 0) {
+      ContainerManagementProtocolProxyData proxy =
+          cmProxy.remove(containerManagerBindAddr);
+      cmProxy.put(containerManagerBindAddr, proxy);
+    }
   }
 
   public synchronized void mayBeCloseProxy(
       ContainerManagementProtocolProxyData proxy) {
+    tryCloseProxy(proxy);
+  }
+
+  private boolean tryCloseProxy(
+      ContainerManagementProtocolProxyData proxy) {
     proxy.activeCallers--;
     if (proxy.scheduledForClose && proxy.activeCallers < 0) {
       LOG.info("Closing proxy : " + proxy.containerManagerBindAddr);
@@ -149,15 +194,18 @@ public class ContainerManagementProtocolProxy {
       } finally {
         this.notifyAll();
       }
+      return true;
     }
+    return false;
   }
 
-  private synchronized void removeProxy(
+  private synchronized boolean removeProxy(
       ContainerManagementProtocolProxyData proxy) {
     if (!proxy.scheduledForClose) {
       proxy.scheduledForClose = true;
-      mayBeCloseProxy(proxy);
+      return tryCloseProxy(proxy);
     }
+    return false;
   }
   
   public synchronized void stopAllProxies() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44cf995/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 3c3d7e3..a64ed73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1095,21 +1095,27 @@
   </property>
 
   <property>
-  	<description>
-  	  Maximum number of proxy connections for node manager. It should always be
-      more than 1. NMClient and MRAppMaster will use this to cache connection
-      with node manager. There will be at max one connection per node manager.
-      Ex. configuring it to a value of 5 will make sure that client will at
-      max have 5 connections cached with 5 different node managers. These
-      connections will be timed out if idle for more than system wide idle
-      timeout period. The token if used for authentication then it will be used
-      only at connection creation time. If new token is received then earlier
-      connection should be closed in order to use newer token. This and
+    <description>
+      Maximum number of proxy connections to cache for node managers. If set
+      to a value greater than zero then the cache is enabled and the NMClient
+      and MRAppMaster will cache the specified number of node manager proxies.
+      There will be at max one proxy per node manager. Ex. configuring it to a
+      value of 5 will make sure that client will at max have 5 proxies cached
+      with 5 different node managers. These connections for these proxies will
+      be timed out if idle for more than the system wide idle timeout period.
+      Note that this could cause issues on large clusters as many connections
+      could linger simultaneously and lead to a large number of connection
+      threads. The token used for authentication will be used only at
+      connection creation time. If a new token is received then the earlier
+      connection should be closed in order to use the new token. This and
       (yarn.client.nodemanager-client-async.thread-pool-max-size) are related
-      and should be sync (no need for them to be equal).
-  	</description>
-  	<name>yarn.client.max-nodemanagers-proxies</name>
-  	<value>500</value>
+      and should be in sync (no need for them to be equal).
+      If the value of this property is zero then the connection cache is
+      disabled and connections will use a zero idle timeout to prevent too
+      many connection threads on large clusters.
+    </description>
+    <name>yarn.client.max-cached-nodemanagers-proxies</name>
+    <value>0</value>
   </property>
   
   <property>