You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2014/10/02 01:49:00 UTC

git commit: YARN-2613. Support retry in NMClient for rolling-upgrades. (Contributed by Jian He)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 8dfe54f6d -> 0708827a9


YARN-2613. Support retry in NMClient for rolling-upgrades. (Contributed by Jian He)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0708827a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0708827a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0708827a

Branch: refs/heads/trunk
Commit: 0708827a935d190d439854e08bb5a655d7daa606
Parents: 8dfe54f
Author: junping_du <ju...@apache.org>
Authored: Wed Oct 1 16:47:47 2014 -0700
Committer: junping_du <ju...@apache.org>
Committed: Wed Oct 1 16:50:30 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  14 +-
 .../impl/ContainerManagementProtocolProxy.java  |  14 +-
 .../org/apache/hadoop/yarn/client/NMProxy.java  |  49 +++++++
 .../org/apache/hadoop/yarn/client/RMProxy.java  |  16 ++-
 .../apache/hadoop/yarn/client/ServerProxy.java  |  94 +++++++++++++
 .../impl/pb/RpcClientFactoryPBImpl.java         |  25 +++-
 .../src/main/resources/yarn-default.xml         |  14 +-
 .../containermanager/TestNMProxy.java           | 141 +++++++++++++++++++
 .../server/TestContainerManagerSecurity.java    |  16 +--
 10 files changed, 351 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4d05757..34588e0 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -121,6 +121,9 @@ Release 2.6.0 - UNRELEASED
     YARN-1972. Added a secure container-executor for Windows. (Remus Rusanu via
     vinodkv)
 
+    YARN-2613. Support retry in NMClient for rolling-upgrades. (Jian He via 
+    junping_du)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 25df4ef..1a201c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1033,7 +1033,7 @@ public class YarnConfiguration extends Configuration {
   /** Max time to wait to establish a connection to RM */
   public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
       RM_PREFIX + "connect.max-wait.ms";
-  public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
+  public static final long DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
       15 * 60 * 1000;
 
   /** Time interval between each attempt to connect to RM */
@@ -1370,6 +1370,18 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "client.max-nodemanagers-proxies";
   public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
 
+  /** Max time to wait to establish a connection to NM */
+  public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS =
+      YARN_PREFIX + "client.nodemanager-connect.max-wait-ms";
+  public static final long DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS =
+      15 * 60 * 1000;
+
+  /** Time interval between each attempt to connect to NM */
+  public static final String CLIENT_NM_CONNECT_RETRY_INTERVAL_MS =
+      YARN_PREFIX + "client.nodemanager-connect.retry-interval-ms";
+  public static final long DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS
+      = 10 * 1000;
+
   public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
   public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
       .name();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
index fbc772f..daeae92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -35,6 +34,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.NMProxy;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -219,16 +219,8 @@ public class ContainerManagementProtocolProxy {
           ConverterUtils.convertFromYarn(token, cmAddr);
       user.addToken(nmToken);
 
-      ContainerManagementProtocol proxy = user
-          .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-
-            @Override
-            public ContainerManagementProtocol run() {
-              return (ContainerManagementProtocol) rpc.getProxy(
-                  ContainerManagementProtocol.class, cmAddr, conf);
-            }
-          });
-      return proxy;
+      return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
+        user, rpc, cmAddr);
     }
 
     public ContainerManagementProtocol getContainerManagementProtocol() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java
new file mode 100644
index 0000000..dd40b45
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+@Public
+@Unstable
+public class NMProxy extends ServerProxy {
+
+  public static <T> T createNMProxy(final Configuration conf,
+      final Class<T> protocol, final UserGroupInformation ugi,
+      final YarnRPC rpc, final InetSocketAddress serverAddress) {
+
+    RetryPolicy retryPolicy =
+        createRetryPolicy(conf,
+          YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS,
+          YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS,
+          YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS);
+
+    return createRetriableProxy(conf, protocol, ugi, rpc, serverAddress,
+      retryPolicy);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index c15018b..ee09973 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.client;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,6 +38,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.conf.HAUtil;
@@ -165,7 +170,7 @@ public class RMProxy<T> {
   @VisibleForTesting
   public static RetryPolicy createRetryPolicy(Configuration conf) {
     long rmConnectWaitMS =
-        conf.getInt(
+        conf.getLong(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
             YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
     long rmConnectionRetryIntervalMS =
@@ -234,9 +239,14 @@ public class RMProxy<T> {
 
     Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
         new HashMap<Class<? extends Exception>, RetryPolicy>();
+
     exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
-    //TO DO: after HADOOP-9576,  IOException can be changed to EOFException
-    exceptionToPolicyMap.put(IOException.class, retryPolicy);
+    exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
+    exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
+    exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
+    exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
+    exceptionToPolicyMap.put(SocketException.class, retryPolicy);
+
     return RetryPolicies.retryByException(
         RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java
new file mode 100644
index 0000000..6c72dc0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+import com.google.common.base.Preconditions;
+
+@Public
+@Unstable
+public class ServerProxy {
+
+  protected static RetryPolicy createRetryPolicy(Configuration conf,
+      String maxWaitTimeStr, long defMaxWaitTime,
+      String connectRetryIntervalStr, long defRetryInterval) {
+    long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
+    long retryIntervalMS =
+        conf.getLong(connectRetryIntervalStr, defRetryInterval);
+    if (maxWaitTime == -1) {
+      // wait forever.
+      return RetryPolicies.RETRY_FOREVER;
+    }
+
+    Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
+        + maxWaitTimeStr + " should be a positive value.");
+    Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
+        + connectRetryIntervalStr + "should be a positive value.");
+
+    RetryPolicy retryPolicy =
+        RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
+          retryIntervalMS, TimeUnit.MILLISECONDS);
+
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+        new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
+    exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
+    exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
+    exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
+    exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
+    exceptionToPolicyMap.put(SocketException.class, retryPolicy);
+
+    return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      exceptionToPolicyMap);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected static <T> T createRetriableProxy(final Configuration conf,
+      final Class<T> protocol, final UserGroupInformation user,
+      final YarnRPC rpc, final InetSocketAddress serverAddress,
+      RetryPolicy retryPolicy) {
+    T proxy = user.doAs(new PrivilegedAction<T>() {
+      @Override
+      public T run() {
+        return (T) rpc.getProxy(protocol, serverAddress, conf);
+      }
+    });
+    return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
index e7c737c..062fa66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
@@ -18,15 +18,18 @@
 
 package org.apache.hadoop.yarn.factories.impl.pb;
 
+import java.io.Closeable;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -87,15 +90,23 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
   @Override
   public void stopClient(Object proxy) {
     try {
-      Method closeMethod = proxy.getClass().getMethod("close");
-      closeMethod.invoke(proxy);
-    } catch (InvocationTargetException e) {
-      throw new YarnRuntimeException(e);
+      if (proxy instanceof Closeable) {
+        ((Closeable) proxy).close();
+        return;
+      } else {
+        InvocationHandler handler = Proxy.getInvocationHandler(proxy);
+        if (handler instanceof Closeable) {
+          ((Closeable) handler).close();
+          return;
+        }
+      }
     } catch (Exception e) {
-      LOG.error("Cannot call close method due to Exception. "
-          + "Ignoring.", e);
+      LOG.error("Cannot call close method due to Exception. " + "Ignoring.", e);
       throw new YarnRuntimeException(e);
     }
+    throw new HadoopIllegalArgumentException(
+      "Cannot close proxy - is not Closeable or "
+          + "does not provide closeable invocation handler " + proxy.getClass());
   }
 
   private String getPBImplClassName(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 866aee7..79244ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1081,7 +1081,19 @@
     <name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
     <value>500</value>
   </property>
-  
+
+  <property>
+    <description>Max time to wait to establish a connection to NM</description>
+    <name>yarn.client.nodemanager-connect.max-wait-ms</name>
+    <value>900000</value>
+  </property>
+
+  <property>
+    <description>Time interval between each attempt to connect to NM</description>
+    <name>yarn.client.nodemanager-connect.retry-interval-ms</name>
+    <value>10000</value>
+  </property>
+
   <property>
   	<description>
   	  Maximum number of proxy connections for node manager. It should always be

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java
new file mode 100644
index 0000000..67f540c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMProxy extends BaseContainerManagerTest {
+
+  public TestNMProxy() throws UnsupportedFileSystemException {
+    super();
+  }
+
+  int retryCount = 0;
+
+  @Before
+  public void setUp() throws Exception {
+    conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000);
+    conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100);
+  }
+
+  @Override
+  protected ContainerManagerImpl
+      createContainerManager(DeletionService delSrvc) {
+    return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+      metrics, new ApplicationACLsManager(conf), dirsHandler) {
+
+      @Override
+      public StartContainersResponse startContainers(
+          StartContainersRequest requests) throws YarnException, IOException {
+        if (retryCount < 5) {
+          retryCount++;
+          throw new java.net.ConnectException("start container exception");
+        }
+        return super.startContainers(requests);
+      }
+
+      @Override
+      public StopContainersResponse stopContainers(
+          StopContainersRequest requests) throws YarnException, IOException {
+        if (retryCount < 5) {
+          retryCount++;
+          throw new java.net.ConnectException("stop container exception");
+        }
+        return super.stopContainers(requests);
+      }
+
+      @Override
+      public GetContainerStatusesResponse getContainerStatuses(
+          GetContainerStatusesRequest request) throws YarnException,
+          IOException {
+        if (retryCount < 5) {
+          retryCount++;
+          throw new java.net.ConnectException("get container status exception");
+        }
+        return super.getContainerStatuses(request);
+      }
+    };
+  }
+
+  @Test(timeout = 20000)
+  public void testNMProxyRetry() throws Exception {
+    containerManager.start();
+    containerManager.setBlockNewContainerRequests(false);
+    StartContainersRequest allRequests =
+        Records.newRecord(StartContainersRequest.class);
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    org.apache.hadoop.yarn.api.records.Token nmToken =
+        context.getNMTokenSecretManager().createNMToken(attemptId,
+          context.getNodeId(), user);
+    final InetSocketAddress address =
+        conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
+          YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
+          YarnConfiguration.DEFAULT_NM_PORT);
+    Token<NMTokenIdentifier> token =
+        ConverterUtils.convertFromYarn(nmToken,
+          SecurityUtil.buildTokenService(address));
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    ugi.addToken(token);
+
+    ContainerManagementProtocol proxy =
+        NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
+          YarnRPC.create(conf), address);
+
+    proxy.startContainers(allRequests);
+    Assert.assertEquals(5, retryCount);
+
+    retryCount = 0;
+    proxy.stopContainers(Records.newRecord(StopContainersRequest.class));
+    Assert.assertEquals(5, retryCount);
+
+    retryCount = 0;
+    proxy.getContainerStatuses(Records
+      .newRecord(GetContainerStatusesRequest.class));
+    Assert.assertEquals(5, retryCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
index 0726a3a..de8d302 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.NMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -607,17 +607,9 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
     if (nmToken != null) {
       ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));      
     }
-
-    proxy = ugi
-        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-
-          @Override
-          public ContainerManagementProtocol run() {
-            return (ContainerManagementProtocol) rpc.getProxy(
-                ContainerManagementProtocol.class,
-                addr, conf);
-          }
-        });
+    proxy =
+        NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
+          rpc, addr);
     return proxy;
   }