You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/04/03 18:59:27 UTC
svn commit: r1309037 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...
Author: bobby
Date: Tue Apr 3 16:59:26 2012
New Revision: 1309037
URL: http://svn.apache.org/viewvc?rev=1309037&view=rev
Log:
MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1309037&r1=1309036&r2=1309037&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Apr 3 16:59:26 2012
@@ -226,6 +226,8 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4012 Hadoop Job setup error leaves no useful info to users
(when LinuxTaskController is used). (tgraves)
+ MAPREDUCE-4062. AM Launcher thread can hang forever (tgraves via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1309037&r1=1309036&r2=1309037&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java Tue Apr 3 16:59:26 2012
@@ -30,11 +30,4 @@ public interface ContainerLauncher
CONTAINER_REMOTE_CLEANUP
}
- // Not a documented config. Only used for tests
- static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX
- + "nm-command-timeout";
- /**
- * Maximum of 1 minute timeout for a Node to react to the command
- */
- static final int DEFAULT_NM_COMMAND_TIMEOUT = 60000;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1309037&r1=1309036&r2=1309037&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Tue Apr 3 16:59:26 2012
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -72,8 +70,6 @@ public class ContainerLauncherImpl exten
static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
- int nmTimeOut;
-
private ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
private AppContext context;
@@ -83,7 +79,6 @@ public class ContainerLauncherImpl exten
private Thread eventHandlingThread;
protected BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
- final Timer commandTimer = new Timer(true);
YarnRPC rpc;
private Container getContainer(ContainerId id) {
@@ -130,30 +125,18 @@ public class ContainerLauncherImpl exten
"Container was killed before it was launched");
return;
}
- CommandTimerTask timerTask = new CommandTimerTask(Thread
- .currentThread(), event);
+
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
ContainerManager proxy = null;
try {
- commandTimer.schedule(timerTask, nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
- // Interrupted during getProxy, but that didn't throw exception
- if (Thread.interrupted()) {
- // The timer canceled the command in the mean while.
- String message = "Container launch failed for " + containerID
- + " : Start-container for " + event.getContainerID()
- + " got interrupted. Returning.";
- this.state = ContainerState.FAILED;
- sendContainerLaunchFailedMsg(taskAttemptID, message);
- return;
- }
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
event.getContainer();
@@ -164,19 +147,6 @@ public class ContainerLauncherImpl exten
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);
- // container started properly. Stop the timer
- timerTask.cancel();
- if (Thread.interrupted()) {
- // The timer canceled the command in the mean while, but
- // startContainer didn't throw exception
- String message = "Container launch failed for " + containerID
- + " : Start-container for " + event.getContainerID()
- + " got interrupted. Returning.";
- this.state = ContainerState.FAILED;
- sendContainerLaunchFailedMsg(taskAttemptID, message);
- return;
- }
-
ByteBuffer portInfo = response
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1;
@@ -198,17 +168,11 @@ public class ContainerLauncherImpl exten
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
- if (Thread.interrupted()) {
- // The timer canceled the command in the mean while.
- LOG.info("Start-container for " + event.getContainerID()
- + " got interrupted.");
- }
String message = "Container launch failed for " + containerID + " : "
+ StringUtils.stringifyException(t);
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
- timerTask.cancel();
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
@@ -220,41 +184,24 @@ public class ContainerLauncherImpl exten
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
- CommandTimerTask timerTask = new CommandTimerTask(Thread
- .currentThread(), event);
-
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("KILLING " + taskAttemptID);
- commandTimer.schedule(timerTask, nmTimeOut);
ContainerManager proxy = null;
try {
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
- if (Thread.interrupted()) {
- // The timer canceled the command in the mean while. No need to
- // return, send cleaned up event anyways.
- LOG.info("Stop-container for " + event.getContainerID()
- + " got interrupted.");
- } else {
// kill the remote container if already launched
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(event.getContainerID());
proxy.stopContainer(stopRequest);
- }
- } catch (Throwable t) {
- if (Thread.interrupted()) {
- // The timer canceled the command in the mean while, clear the
- // interrupt flag
- LOG.info("Stop-container for " + event.getContainerID()
- + " got interrupted.");
- }
+ } catch (Throwable t) {
// ignore the cleanup failure
String message = "cleanup failed for container "
@@ -264,15 +211,6 @@ public class ContainerLauncherImpl exten
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
LOG.warn(message);
} finally {
- timerTask.cancel();
- if (Thread.interrupted()) {
- LOG.info("Stop-container for " + event.getContainerID()
- + " got interrupted.");
- // ignore the cleanup failure
- context.getEventHandler().handle(
- new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
- "cleanup failed for container " + event.getContainerID()));
- }
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
@@ -303,8 +241,6 @@ public class ContainerLauncherImpl exten
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
- this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
- ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
this.rpc = createYarnRPC(conf);
super.init(conf);
}
@@ -409,44 +345,6 @@ public class ContainerLauncherImpl exten
return proxy;
}
- private static class CommandTimerTask extends TimerTask {
- private final Thread commandThread;
- protected final String message;
- private boolean cancelled = false;
-
- public CommandTimerTask(Thread thread, ContainerLauncherEvent event) {
- super();
- this.commandThread = thread;
- this.message = "Couldn't complete " + event.getType() + " on "
- + event.getContainerID() + "/" + event.getTaskAttemptID()
- + ". Interrupting and returning";
- }
-
- @Override
- public void run() {
- synchronized (this) {
- if (this.cancelled) {
- return;
- }
- LOG.warn(this.message);
- StackTraceElement[] trace = this.commandThread.getStackTrace();
- StringBuilder logMsg = new StringBuilder();
- for (int i = 0; i < trace.length; i++) {
- logMsg.append("\n\tat " + trace[i]);
- }
- LOG.info("Stack trace of the command-thread: \n" + logMsg.toString());
- this.commandThread.interrupt();
- }
- }
-
- @Override
- public boolean cancel() {
- synchronized (this) {
- this.cancelled = true;
- return super.cancel();
- }
- }
- }
/**
* Setup and start the container on remote nodemanager.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1309037&r1=1309036&r2=1309037&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Tue Apr 3 16:59:26 2012
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.a
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +32,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -44,18 +47,39 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestContainerLauncher {
- static final Log LOG = LogFactory
- .getLog(TestContainerLauncher.class);
+ private static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+ Configuration conf;
+ Server server;
+
+ static final Log LOG = LogFactory.getLog(TestContainerLauncher.class);
@Test
public void testPoolSize() throws InterruptedException {
@@ -104,10 +128,10 @@ public class TestContainerLauncher {
Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
containerLauncher.finishEventHandling = false;
for (int i = 0; i < 10; i++) {
- ContainerId containerId =
- BuilderUtils.newContainerId(appAttemptId, i + 10);
- TaskAttemptId taskAttemptId =
- MRBuilderUtils.newTaskAttemptId(taskId, i + 10);
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId,
+ i + 10);
+ TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+ i + 10);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@@ -119,8 +143,7 @@ public class TestContainerLauncher {
// Different hosts, there should be an increase in core-thread-pool size to
// 21(11hosts+10buffer)
// Core pool size should be 21 but the live pool size should be only 11.
- containerLauncher.expectedCorePoolSize =
- 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
+ containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = false;
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
@@ -200,26 +223,28 @@ public class TestContainerLauncher {
@Test
public void testSlowNM() throws Exception {
- test(false);
- }
-
- @Test
- public void testSlowNMWithInterruptsSwallowed() throws Exception {
- test(true);
+ test();
}
- private void test(boolean swallowInterrupts) throws Exception {
+ private void test() throws Exception {
- MRApp app = new MRAppWithSlowNM(swallowInterrupts);
-
- Configuration conf = new Configuration();
+ conf = new Configuration();
int maxAttempts = 1;
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ // set timeout low for the test
+ conf.setInt("yarn.rpc.nm-command-timeout", 3000);
+ conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName());
+ YarnRPC rpc = YarnRPC.create(conf);
+ String bindAddr = "localhost:0";
+ InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+ server = rpc.getServer(ContainerManager.class, new DummyContainerManager(),
+ addr, conf, null, 1);
+ server.start();
- // Set low timeout for NM commands
- conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000);
+ MRApp app = new MRAppWithSlowNM();
+ try {
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
@@ -231,8 +256,8 @@ public class TestContainerLauncher {
Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
.next().getAttempts();
- Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
- .size());
+ Assert.assertEquals("Num attempts is not correct", maxAttempts,
+ attempts.size());
TaskAttempt attempt = attempts.values().iterator().next();
app.waitForState(attempt, TaskAttemptState.ASSIGNED);
@@ -241,20 +266,18 @@ public class TestContainerLauncher {
String diagnostics = attempt.getDiagnostics().toString();
LOG.info("attempt.getDiagnostics: " + diagnostics);
- if (swallowInterrupts) {
- Assert.assertEquals("[Container launch failed for "
- + "container_0_0000_01_000000 : Start-container for "
- + "container_0_0000_01_000000 got interrupted. Returning.]",
- diagnostics);
- } else {
+
Assert.assertTrue(diagnostics.contains("Container launch failed for "
+ "container_0_0000_01_000000 : "));
- Assert.assertTrue(diagnostics
- .contains(": java.lang.InterruptedException"));
- }
+ Assert
+ .assertTrue(diagnostics
+ .contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting for channel"));
+ } finally {
+ server.stop();
app.stop();
}
+ }
private final class CustomContainerLauncher extends ContainerLauncherImpl {
@@ -317,13 +340,10 @@ public class TestContainerLauncher {
}
}
- private static class MRAppWithSlowNM extends MRApp {
-
- final boolean swallowInterrupts;
+ private class MRAppWithSlowNM extends MRApp {
- public MRAppWithSlowNM(boolean swallowInterrupts) {
+ public MRAppWithSlowNM() {
super(1, 0, false, "TestContainerLauncher", true);
- this.swallowInterrupts = swallowInterrupts;
}
@Override
@@ -333,20 +353,57 @@ public class TestContainerLauncher {
protected ContainerManager getCMProxy(ContainerId containerID,
String containerManagerBindAddr, ContainerToken containerToken)
throws IOException {
+ // make proxy connect to our local containerManager server
+ ContainerManager proxy = (ContainerManager) rpc.getProxy(
+ ContainerManager.class,
+ NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
+ return proxy;
+ }
+ };
+
+ };
+ }
+
+ public class DummyContainerManager implements ContainerManager {
+
+ private ContainerStatus status = null;
+
+ @Override
+ public GetContainerStatusResponse getContainerStatus(
+ GetContainerStatusRequest request) throws YarnRemoteException {
+ GetContainerStatusResponse response = recordFactory
+ .newRecordInstance(GetContainerStatusResponse.class);
+ response.setStatus(status);
+ return response;
+ }
+
+ @Override
+ public StartContainerResponse startContainer(StartContainerRequest request)
+ throws YarnRemoteException {
+ ContainerLaunchContext container = request.getContainerLaunchContext();
+ StartContainerResponse response = recordFactory
+ .newRecordInstance(StartContainerResponse.class);
+ status = recordFactory.newRecordInstance(ContainerStatus.class);
try {
- synchronized (this) {
- wait(); // Just hang the thread simulating a very slow NM.
+ // make the thread sleep to look like its not going to respond
+ Thread.sleep(15000);
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new UndeclaredThrowableException(e);
}
- } catch (InterruptedException e) {
- LOG.info(e);
- if (!MRAppWithSlowNM.this.swallowInterrupts) {
- throw new IOException(e);
+ status.setState(ContainerState.RUNNING);
+ status.setContainerId(container.getContainerId());
+ status.setExitStatus(0);
+ return response;
}
- Thread.currentThread().interrupt();
+
+ @Override
+ public StopContainerResponse stopContainer(StopContainerRequest request)
+ throws YarnRemoteException {
+ Exception e = new Exception("Dummy function", new Exception(
+ "Dummy function cause"));
+ throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(
+ null).createYarnRemoteException(e);
}
- return null;
}
- };
- };
}
-}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java?rev=1309037&r1=1309036&r2=1309037&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java Tue Apr 3 16:59:26 2012
@@ -24,6 +24,8 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.ContainerManagerPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainerResponsePBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusRequestProto;
@@ -48,12 +51,25 @@ import com.google.protobuf.ServiceExcept
public class ContainerManagerPBClientImpl implements ContainerManager {
+ // Not a documented config. Only used for tests
+ static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+ + "rpc.nm-command-timeout";
+
+ /**
+ * Maximum of 1 minute timeout for a Node to react to the command
+ */
+ static final int DEFAULT_COMMAND_TIMEOUT = 60000;
+
private ContainerManagerPB proxy;
public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
proxy = (ContainerManagerPB)RPC.getProxy(
- ContainerManagerPB.class, clientVersion, addr, conf);
+ ContainerManagerPB.class, clientVersion, addr, ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf), expireIntvl);
}
public void close() {