You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2014/10/02 01:49:00 UTC
git commit: YARN-2613. Support retry in NMClient for
rolling-upgrades. (Contributed by Jian He)
Repository: hadoop
Updated Branches:
refs/heads/trunk 8dfe54f6d -> 0708827a9
YARN-2613. Support retry in NMClient for rolling-upgrades. (Contributed by Jian He)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0708827a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0708827a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0708827a
Branch: refs/heads/trunk
Commit: 0708827a935d190d439854e08bb5a655d7daa606
Parents: 8dfe54f
Author: junping_du <ju...@apache.org>
Authored: Wed Oct 1 16:47:47 2014 -0700
Committer: junping_du <ju...@apache.org>
Committed: Wed Oct 1 16:50:30 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 14 +-
.../impl/ContainerManagementProtocolProxy.java | 14 +-
.../org/apache/hadoop/yarn/client/NMProxy.java | 49 +++++++
.../org/apache/hadoop/yarn/client/RMProxy.java | 16 ++-
.../apache/hadoop/yarn/client/ServerProxy.java | 94 +++++++++++++
.../impl/pb/RpcClientFactoryPBImpl.java | 25 +++-
.../src/main/resources/yarn-default.xml | 14 +-
.../containermanager/TestNMProxy.java | 141 +++++++++++++++++++
.../server/TestContainerManagerSecurity.java | 16 +--
10 files changed, 351 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4d05757..34588e0 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -121,6 +121,9 @@ Release 2.6.0 - UNRELEASED
YARN-1972. Added a secure container-executor for Windows. (Remus Rusanu via
vinodkv)
+ YARN-2613. Support retry in NMClient for rolling-upgrades. (Jian He via
+ junping_du)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 25df4ef..1a201c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1033,7 +1033,7 @@ public class YarnConfiguration extends Configuration {
/** Max time to wait to establish a connection to RM */
public static final String RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
RM_PREFIX + "connect.max-wait.ms";
- public static final int DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
+ public static final long DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
15 * 60 * 1000;
/** Time interval between each attempt to connect to RM */
@@ -1370,6 +1370,18 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "client.max-nodemanagers-proxies";
public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
+ /** Max time to wait to establish a connection to NM */
+ public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS =
+ YARN_PREFIX + "client.nodemanager-connect.max-wait-ms";
+ public static final long DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS =
+ 15 * 60 * 1000;
+
+ /** Time interval between each attempt to connect to NM */
+ public static final String CLIENT_NM_CONNECT_RETRY_INTERVAL_MS =
+ YARN_PREFIX + "client.nodemanager-connect.retry-interval-ms";
+ public static final long DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS
+ = 10 * 1000;
+
public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
.name();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
index fbc772f..daeae92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -35,6 +34,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -219,16 +219,8 @@ public class ContainerManagementProtocolProxy {
ConverterUtils.convertFromYarn(token, cmAddr);
user.addToken(nmToken);
- ContainerManagementProtocol proxy = user
- .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) rpc.getProxy(
- ContainerManagementProtocol.class, cmAddr, conf);
- }
- });
- return proxy;
+ return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class,
+ user, rpc, cmAddr);
}
public ContainerManagementProtocol getContainerManagementProtocol() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java
new file mode 100644
index 0000000..dd40b45
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/NMProxy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+@Public
+@Unstable
+public class NMProxy extends ServerProxy {
+
+ public static <T> T createNMProxy(final Configuration conf,
+ final Class<T> protocol, final UserGroupInformation ugi,
+ final YarnRPC rpc, final InetSocketAddress serverAddress) {
+
+ RetryPolicy retryPolicy =
+ createRetryPolicy(conf,
+ YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS,
+ YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_MAX_WAIT_MS,
+ YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_CLIENT_NM_CONNECT_RETRY_INTERVAL_MS);
+
+ return createRetriableProxy(conf, protocol, ugi, rpc, serverAddress,
+ retryPolicy);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index c15018b..ee09973 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
@@ -35,6 +38,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.HAUtil;
@@ -165,7 +170,7 @@ public class RMProxy<T> {
@VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) {
long rmConnectWaitMS =
- conf.getInt(
+ conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
long rmConnectionRetryIntervalMS =
@@ -234,9 +239,14 @@ public class RMProxy<T> {
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
+
exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
- //TO DO: after HADOOP-9576, IOException can be changed to EOFException
- exceptionToPolicyMap.put(IOException.class, retryPolicy);
+ exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
+ exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
+ exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
+ exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
+ exceptionToPolicyMap.put(SocketException.class, retryPolicy);
+
return RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java
new file mode 100644
index 0000000..6c72dc0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ServerProxy.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+import com.google.common.base.Preconditions;
+
+@Public
+@Unstable
+public class ServerProxy {
+
+ protected static RetryPolicy createRetryPolicy(Configuration conf,
+ String maxWaitTimeStr, long defMaxWaitTime,
+ String connectRetryIntervalStr, long defRetryInterval) {
+ long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime);
+ long retryIntervalMS =
+ conf.getLong(connectRetryIntervalStr, defRetryInterval);
+ if (maxWaitTime == -1) {
+ // wait forever.
+ return RetryPolicies.RETRY_FOREVER;
+ }
+
+ Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. "
+ + maxWaitTimeStr + " should be a positive value.");
+ Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. "
+ + connectRetryIntervalStr + "should be a positive value.");
+
+ RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime,
+ retryIntervalMS, TimeUnit.MILLISECONDS);
+
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
+ exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
+ exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
+ exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
+ exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
+ exceptionToPolicyMap.put(SocketException.class, retryPolicy);
+
+ return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+ exceptionToPolicyMap);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static <T> T createRetriableProxy(final Configuration conf,
+ final Class<T> protocol, final UserGroupInformation user,
+ final YarnRPC rpc, final InetSocketAddress serverAddress,
+ RetryPolicy retryPolicy) {
+ T proxy = user.doAs(new PrivilegedAction<T>() {
+ @Override
+ public T run() {
+ return (T) rpc.getProxy(protocol, serverAddress, conf);
+ }
+ });
+ return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
index e7c737c..062fa66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcClientFactoryPBImpl.java
@@ -18,15 +18,18 @@
package org.apache.hadoop.yarn.factories.impl.pb;
+import java.io.Closeable;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -87,15 +90,23 @@ public class RpcClientFactoryPBImpl implements RpcClientFactory {
@Override
public void stopClient(Object proxy) {
try {
- Method closeMethod = proxy.getClass().getMethod("close");
- closeMethod.invoke(proxy);
- } catch (InvocationTargetException e) {
- throw new YarnRuntimeException(e);
+ if (proxy instanceof Closeable) {
+ ((Closeable) proxy).close();
+ return;
+ } else {
+ InvocationHandler handler = Proxy.getInvocationHandler(proxy);
+ if (handler instanceof Closeable) {
+ ((Closeable) handler).close();
+ return;
+ }
+ }
} catch (Exception e) {
- LOG.error("Cannot call close method due to Exception. "
- + "Ignoring.", e);
+ LOG.error("Cannot call close method due to Exception. " + "Ignoring.", e);
throw new YarnRuntimeException(e);
}
+ throw new HadoopIllegalArgumentException(
+ "Cannot close proxy - is not Closeable or "
+ + "does not provide closeable invocation handler " + proxy.getClass());
}
private String getPBImplClassName(Class<?> clazz) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 866aee7..79244ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1081,7 +1081,19 @@
<name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
<value>500</value>
</property>
-
+
+ <property>
+ <description>Max time to wait to establish a connection to NM</description>
+ <name>yarn.client.nodemanager-connect.max-wait-ms</name>
+ <value>900000</value>
+ </property>
+
+ <property>
+ <description>Time interval between each attempt to connect to NM</description>
+ <name>yarn.client.nodemanager-connect.retry-interval-ms</name>
+ <value>10000</value>
+ </property>
+
<property>
<description>
Maximum number of proxy connections for node manager. It should always be
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java
new file mode 100644
index 0000000..67f540c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestNMProxy.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMProxy extends BaseContainerManagerTest {
+
+ public TestNMProxy() throws UnsupportedFileSystemException {
+ super();
+ }
+
+ int retryCount = 0;
+
+ @Before
+ public void setUp() throws Exception {
+ conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_MAX_WAIT_MS, 10000);
+ conf.setLong(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 100);
+ }
+
+ @Override
+ protected ContainerManagerImpl
+ createContainerManager(DeletionService delSrvc) {
+ return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+ metrics, new ApplicationACLsManager(conf), dirsHandler) {
+
+ @Override
+ public StartContainersResponse startContainers(
+ StartContainersRequest requests) throws YarnException, IOException {
+ if (retryCount < 5) {
+ retryCount++;
+ throw new java.net.ConnectException("start container exception");
+ }
+ return super.startContainers(requests);
+ }
+
+ @Override
+ public StopContainersResponse stopContainers(
+ StopContainersRequest requests) throws YarnException, IOException {
+ if (retryCount < 5) {
+ retryCount++;
+ throw new java.net.ConnectException("stop container exception");
+ }
+ return super.stopContainers(requests);
+ }
+
+ @Override
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request) throws YarnException,
+ IOException {
+ if (retryCount < 5) {
+ retryCount++;
+ throw new java.net.ConnectException("get container status exception");
+ }
+ return super.getContainerStatuses(request);
+ }
+ };
+ }
+
+ @Test(timeout = 20000)
+ public void testNMProxyRetry() throws Exception {
+ containerManager.start();
+ containerManager.setBlockNewContainerRequests(false);
+ StartContainersRequest allRequests =
+ Records.newRecord(StartContainersRequest.class);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ org.apache.hadoop.yarn.api.records.Token nmToken =
+ context.getNMTokenSecretManager().createNMToken(attemptId,
+ context.getNodeId(), user);
+ final InetSocketAddress address =
+ conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_PORT);
+ Token<NMTokenIdentifier> token =
+ ConverterUtils.convertFromYarn(nmToken,
+ SecurityUtil.buildTokenService(address));
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ ugi.addToken(token);
+
+ ContainerManagementProtocol proxy =
+ NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
+ YarnRPC.create(conf), address);
+
+ proxy.startContainers(allRequests);
+ Assert.assertEquals(5, retryCount);
+
+ retryCount = 0;
+ proxy.stopContainers(Records.newRecord(StopContainersRequest.class));
+ Assert.assertEquals(5, retryCount);
+
+ retryCount = 0;
+ proxy.getContainerStatuses(Records
+ .newRecord(GetContainerStatusesRequest.class));
+ Assert.assertEquals(5, retryCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0708827a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
index 0726a3a..de8d302 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -607,17 +607,9 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
if (nmToken != null) {
ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
}
-
- proxy = ugi
- .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) rpc.getProxy(
- ContainerManagementProtocol.class,
- addr, conf);
- }
- });
+ proxy =
+ NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
+ rpc, addr);
return proxy;
}