You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC
svn commit: r1513258 [6/10] - in
/hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-...
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java Mon Aug 12 21:25:49 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
@@ -31,15 +33,16 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.NMNotYetReadyException;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.After;
@@ -282,15 +285,18 @@ public class TestNodeManagerResync {
try {
while (!isStopped && numContainers < 10) {
ContainerId cId = TestNodeManagerShutdown.createContainerId();
- StartContainerRequest startRequest =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(null);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ null);
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
System.out.println("no. of containers to be launched: "
+ numContainers);
numContainers++;
try {
- getContainerManager().startContainer(startRequest);
+ getContainerManager().startContainers(allRequests);
} catch (YarnException e) {
numContainersRejected++;
Assert.assertTrue(e.getMessage().contains(
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Mon Aug 12 21:25:49 2013
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -40,8 +41,9 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,7 +55,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -181,12 +182,6 @@ public class TestNodeManagerShutdown {
containerLaunchContext.setLocalResources(localResources);
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- StartContainerRequest startRequest =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest
- .setContainerToken(TestContainerManager.createContainerToken(cId, 0,
- nodeId, user, nm.getNMContext().getContainerTokenSecretManager()));
final InetSocketAddress containerManagerBindAddress =
NetUtils.createSocketAddrForHost("127.0.0.1", 12345);
UserGroupInformation currentUser = UserGroupInformation
@@ -210,13 +205,22 @@ public class TestNodeManagerShutdown {
containerManagerBindAddress, conf);
}
});
- containerManager.startContainer(startRequest);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ TestContainerManager.createContainerToken(cId, 0,
+ nodeId, user, nm.getNMContext().getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
- GetContainerStatusRequest request =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- request.setContainerId(cId);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cId);
+ GetContainerStatusesRequest request =
+ GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
- containerManager.getContainerStatus(request).getStatus();
+ containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Aug 12 21:25:49 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.no
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -41,10 +43,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,6 +57,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -60,7 +65,6 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -95,24 +99,40 @@ public class TestNodeStatusUpdater {
}
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
- static final Path basedir =
- new Path("target", TestNodeStatusUpdater.class.getName());
+ static final File basedir =
+ new File("target", TestNodeStatusUpdater.class.getName());
+ static final File nmLocalDir = new File(basedir, "nm0");
+ static final File tmpDir = new File(basedir, "tmpDir");
+ static final File remoteLogsDir = new File(basedir, "remotelogs");
+ static final File logsDir = new File(basedir, "logs");
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
volatile int heartBeatID = 0;
volatile Throwable nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
- private final Configuration conf = createNMConfig();
+ private boolean triggered = false;
+ private Configuration conf;
private NodeManager nm;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
+ private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+
+ @Before
+ public void setUp() {
+ nmLocalDir.mkdirs();
+ tmpDir.mkdirs();
+ logsDir.mkdirs();
+ remoteLogsDir.mkdirs();
+ conf = createNMConfig();
+ }
@After
public void tearDown() {
this.registeredNodes.clear();
heartBeatID = 0;
ServiceOperations.stop(nm);
+ assertionFailedInThread.set(false);
DefaultMetricsSystem.shutdown();
}
@@ -274,6 +294,11 @@ public class TestNodeStatusUpdater {
protected ResourceTracker getRMClient() {
return resourceTracker;
}
+
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
}
private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
@@ -290,6 +315,10 @@ public class TestNodeStatusUpdater {
return resourceTracker;
}
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
}
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
@@ -307,7 +336,12 @@ public class TestNodeStatusUpdater {
protected ResourceTracker getRMClient() {
return resourceTracker;
}
-
+
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
+
@Override
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return true;
@@ -315,21 +349,16 @@ public class TestNodeStatusUpdater {
}
private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
- public ResourceTracker resourceTracker =
- new MyResourceTracker(this.context);
+
private Context context;
- private long waitStartTime;
private final long rmStartIntervalMS;
private final boolean rmNeverStart;
- private volatile boolean triggered = false;
- private long durationWhenTriggered = -1;
-
+ public ResourceTracker resourceTracker;
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
long rmStartIntervalMS, boolean rmNeverStart) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
- this.waitStartTime = System.currentTimeMillis();
this.rmStartIntervalMS = rmStartIntervalMS;
this.rmNeverStart = rmNeverStart;
}
@@ -337,25 +366,16 @@ public class TestNodeStatusUpdater {
@Override
protected void serviceStart() throws Exception {
//record the startup time
- this.waitStartTime = System.currentTimeMillis();
super.serviceStart();
}
@Override
- protected ResourceTracker getRMClient() {
- if (!triggered) {
- long t = System.currentTimeMillis();
- long duration = t - waitStartTime;
- if (duration <= rmStartIntervalMS
- || rmNeverStart) {
- throw new YarnRuntimeException("Faking RM start failure as start " +
- "delay timer has not expired.");
- } else {
- //triggering
- triggered = true;
- durationWhenTriggered = duration;
- }
- }
+ protected ResourceTracker getRMClient() throws IOException {
+ RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+ resourceTracker =
+ (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+ new MyResourceTracker6(this.context, rmStartIntervalMS,
+ rmNeverStart), retryPolicy);
return resourceTracker;
}
@@ -363,37 +383,35 @@ public class TestNodeStatusUpdater {
return triggered;
}
- private long getWaitStartTime() {
- return waitStartTime;
- }
-
- private long getDurationWhenTriggered() {
- return durationWhenTriggered;
- }
-
@Override
- public String toString() {
- return "MyNodeStatusUpdater4{" +
- "rmNeverStart=" + rmNeverStart +
- ", triggered=" + triggered +
- ", duration=" + durationWhenTriggered +
- ", rmStartIntervalMS=" + rmStartIntervalMS +
- '}';
+ protected void stopRMProxy() {
+ return;
}
}
+
+
private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
private ResourceTracker resourceTracker;
+ private Configuration conf;
public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
- NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) {
super(context, dispatcher, healthChecker, metrics);
resourceTracker = new MyResourceTracker5();
+ this.conf = conf;
}
@Override
protected ResourceTracker getRMClient() {
- return resourceTracker;
+ RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+ return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+ resourceTracker, retryPolicy);
+ }
+
+ @Override
+ protected void stopRMProxy() {
+ return;
}
}
@@ -408,7 +426,7 @@ public class TestNodeStatusUpdater {
return this.nodeStatusUpdater;
}
- protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
+ public MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
}
@@ -417,15 +435,18 @@ public class TestNodeStatusUpdater {
public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater;
private CyclicBarrier syncBarrier;
- public MyNodeManager2 (CyclicBarrier syncBarrier) {
+ private Configuration conf;
+
+ public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) {
this.syncBarrier = syncBarrier;
+ this.conf = conf;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
nodeStatusUpdater =
new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
- metrics);
+ metrics, conf);
return nodeStatusUpdater;
}
@@ -433,6 +454,13 @@ public class TestNodeStatusUpdater {
protected void serviceStop() throws Exception {
super.serviceStop();
isStopped = true;
+ ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
+ .containermanager.container.Container> containers =
+ getNMContext().getContainers();
+ // ensure that containers are empty
+ if(!containers.isEmpty()) {
+ assertionFailedInThread.set(true);
+ }
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
}
}
@@ -577,7 +605,7 @@ public class TestNodeStatusUpdater {
.get(4).getState() == ContainerState.RUNNING
&& request.getNodeStatus().getContainersStatuses().get(4)
.getContainerId().getId() == 5);
- throw new YarnRuntimeException("Lost the heartbeat response");
+ throw new java.net.ConnectException("Lost the heartbeat response");
} else if (heartBeatID == 2) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
.size(), 7);
@@ -646,7 +674,63 @@ public class TestNodeStatusUpdater {
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
heartBeatID++;
- throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+ throw new java.net.ConnectException(
+ "NodeHeartbeat exception");
+ }
+ }
+
+ private class MyResourceTracker6 implements ResourceTracker {
+
+ private final Context context;
+ private long rmStartIntervalMS;
+ private boolean rmNeverStart;
+ private final long waitStartTime;
+
+ public MyResourceTracker6(Context context, long rmStartIntervalMS,
+ boolean rmNeverStart) {
+ this.context = context;
+ this.rmStartIntervalMS = rmStartIntervalMS;
+ this.rmNeverStart = rmNeverStart;
+ this.waitStartTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException, IOException,
+ IOException {
+ if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
+ || rmNeverStart) {
+ throw new java.net.ConnectException("Faking RM start failure as start "
+ + "delay timer has not expired.");
+ } else {
+ NodeId nodeId = request.getNodeId();
+ Resource resource = request.getResource();
+ LOG.info("Registering " + nodeId.toString());
+ // NOTE: this really should be checking against the config value
+ InetSocketAddress expected = NetUtils.getConnectAddress(
+ conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
+ Assert.assertEquals(NetUtils.getHostPortString(expected),
+ nodeId.toString());
+ Assert.assertEquals(5 * 1024, resource.getMemory());
+ registeredNodes.add(nodeId);
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ triggered = true;
+ return response;
+ }
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnException, IOException {
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID++);
+
+ NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+ newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null,
+ null, null, null, 1000L);
+ return nhResponse;
}
}
@@ -658,9 +742,43 @@ public class TestNodeStatusUpdater {
@After
public void deleteBaseDir() throws IOException {
FileContext lfs = FileContext.getLocalFSFileContext();
- lfs.delete(basedir, true);
+ lfs.delete(new Path(basedir.getPath()), true);
}
+ @Test(timeout = 90000)
+ public void testRecentlyFinishedContainers() throws Exception {
+ NodeManager nm = new NodeManager();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(
+ NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+ "10000");
+ nm.init(conf);
+ NodeStatusUpdaterImpl nodeStatusUpdater =
+ (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+ ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
+
+
+ nodeStatusUpdater.addStoppedContainersToCache(cId);
+ Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
+
+ long time1 = System.currentTimeMillis();
+ int waitInterval = 15;
+ while (waitInterval-- > 0
+ && nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
+ nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
+ Thread.sleep(1000);
+ }
+ long time2 = System.currentTimeMillis();
+ // By this time the container will be removed from cache. need to verify.
+ Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
+ Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000);
+ }
+
+
+
@Test
public void testNMRegistration() throws InterruptedException {
nm = new NodeManager() {
@@ -843,8 +961,7 @@ public class TestNodeStatusUpdater {
final long connectionRetryIntervalSecs = 1;
//Waiting for rmStartIntervalMS, RM will be started
final long rmStartIntervalMS = 2*1000;
- YarnConfiguration conf = createNMConfig();
- conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
@@ -907,8 +1024,6 @@ public class TestNodeStatusUpdater {
}
long duration = System.currentTimeMillis() - waitStartTime;
MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
- Assert.assertTrue("Updater was never started",
- myUpdater.getWaitStartTime()>0);
Assert.assertTrue("NM started before updater triggered",
myUpdater.isTriggered());
Assert.assertTrue("NM should have connected to RM after "
@@ -1033,23 +1148,32 @@ public class TestNodeStatusUpdater {
@Test(timeout = 200000)
public void testNodeStatusUpdaterRetryAndNMShutdown()
- throws InterruptedException {
+ throws Exception {
final long connectionWaitSecs = 1;
final long connectionRetryIntervalSecs = 1;
YarnConfiguration conf = createNMConfig();
- conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
connectionRetryIntervalSecs);
+ conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
CyclicBarrier syncBarrier = new CyclicBarrier(2);
- nm = new MyNodeManager2(syncBarrier);
+ nm = new MyNodeManager2(syncBarrier, conf);
nm.init(conf);
nm.start();
+ // start a container
+ ContainerId cId = TestNodeManagerShutdown.createContainerId();
+ FileContext localFS = FileContext.getLocalFSFileContext();
+ TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir,
+ new File("start_file.txt"));
+
try {
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
}
+ Assert.assertFalse("Containers not cleaned up when NM stopped",
+ assertionFailedInThread.get());
Assert.assertTrue(((MyNodeManager2) nm).isStopped);
Assert.assertTrue("calculate heartBeatCount based on" +
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
@@ -1167,15 +1291,13 @@ public class TestNodeStatusUpdater {
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
+ conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346");
- conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
- .getPath());
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
- "remotelogs").toUri().getPath());
- conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
- .toUri().getPath());
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ remoteLogsDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
return conf;
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Mon Aug 12 21:25:49 2013
@@ -21,16 +21,21 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import junit.framework.Assert;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
@@ -42,7 +47,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
-import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -80,7 +84,7 @@ public class TestPBRecordImpl {
e.setStackTrace(new StackTraceElement[] {
new StackTraceElement("foo", "bar", "baz", 10),
new StackTraceElement("sbb", "one", "onm", 10) });
- ret.setException(YarnServerBuilderUtils.newSerializedException(e));
+ ret.setException(SerializedException.newInstance(e));
return ret;
}
@@ -176,4 +180,33 @@ public class TestPBRecordImpl {
assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
}
+
+ @Test(timeout=10000)
+ public void testSerializedExceptionDeSer() throws Exception{
+ // without cause
+ YarnException yarnEx = new YarnException("Yarn_Exception");
+ SerializedException serEx = SerializedException.newInstance(yarnEx);
+ Throwable throwable = serEx.deSerialize();
+ Assert.assertEquals(yarnEx.getClass(), throwable.getClass());
+ Assert.assertEquals(yarnEx.getMessage(), throwable.getMessage());
+
+ // with cause
+ IOException ioe = new IOException("Test_IOException");
+ RuntimeException runtimeException =
+ new RuntimeException("Test_RuntimeException", ioe);
+ YarnException yarnEx2 =
+ new YarnException("Test_YarnException", runtimeException);
+
+ SerializedException serEx2 = SerializedException.newInstance(yarnEx2);
+ Throwable throwable2 = serEx2.deSerialize();
+ throwable2.printStackTrace();
+ Assert.assertEquals(yarnEx2.getClass(), throwable2.getClass());
+ Assert.assertEquals(yarnEx2.getMessage(), throwable2.getMessage());
+
+ Assert.assertEquals(runtimeException.getClass(), throwable2.getCause().getClass());
+ Assert.assertEquals(runtimeException.getMessage(), throwable2.getCause().getMessage());
+
+ Assert.assertEquals(ioe.getClass(), throwable2.getCause().getCause().getClass());
+ Assert.assertEquals(ioe.getMessage(), throwable2.getCause().getCause().getMessage());
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Mon Aug 12 21:25:49 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import junit.framework.Assert;
@@ -32,7 +34,7 @@ import org.apache.hadoop.fs.UnsupportedF
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -118,6 +120,11 @@ public abstract class BaseContainerManag
};
@Override
+ protected void stopRMProxy() {
+ return;
+ }
+
+ @Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
}
@@ -184,15 +191,18 @@ public abstract class BaseContainerManag
@Override
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
- Container container, boolean stopRequest) throws YarnException {
+ Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
// do nothing
}
-
+ @Override
+ protected void authorizeUser(UserGroupInformation remoteUgi,
+ NMTokenIdentifier nmTokenIdentifier) {
+ // do nothing
+ }
@Override
protected void authorizeStartRequest(
NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier,
- UserGroupInformation ugi) throws YarnException {
+ ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
// do nothing
}
@@ -233,18 +243,20 @@ public abstract class BaseContainerManag
public static void waitForContainerState(ContainerManagementProtocol containerManager,
ContainerId containerID, ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnException, IOException {
- GetContainerStatusRequest request =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- request.setContainerId(containerID);
- ContainerStatus containerStatus =
- containerManager.getContainerStatus(request).getStatus();
- int timeoutSecs = 0;
+ List<ContainerId> list = new ArrayList<ContainerId>();
+ list.add(containerID);
+ GetContainerStatusesRequest request =
+ GetContainerStatusesRequest.newInstance(list);
+ ContainerStatus containerStatus =
+ containerManager.getContainerStatuses(request).getContainerStatuses()
+ .get(0);
+ int timeoutSecs = 0;
while (!containerStatus.getState().equals(finalState)
&& timeoutSecs++ < timeOutMax) {
Thread.sleep(1000);
LOG.info("Waiting for container to get into state " + finalState
+ ". Current state is " + containerStatus.getState());
- containerStatus = containerManager.getContainerStatus(request).getStatus();
+ containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
}
LOG.info("Container state is " + containerStatus.getState());
Assert.assertEquals("ContainerState is not correct (timedout)",
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Mon Aug 12 21:25:49 2013
@@ -24,6 +24,7 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -37,9 +38,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -51,8 +56,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
@@ -62,6 +69,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@@ -87,11 +95,11 @@ public class TestContainerManager extend
super.setup();
}
- private ContainerId createContainerId() {
+ private ContainerId createContainerId(int id) {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
return containerId;
}
@@ -118,6 +126,14 @@ public class TestContainerManager extend
.getKeyId()));
return ugi;
}
+
+ @Override
+ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+ Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
+ if(container == null || container.getUser().equals("Fail")){
+ throw new YarnException("Reject this container");
+ }
+ }
};
}
@@ -137,12 +153,17 @@ public class TestContainerManager extend
// Just do a query for a non-existing container.
boolean throwsException = false;
try {
- GetContainerStatusRequest request =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- ContainerId cId = createContainerId();
- request.setContainerId(cId);
- containerManager.getContainerStatus(request);
- } catch (YarnException e) {
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ ContainerId id =createContainerId(0);
+ containerIds.add(id);
+ GetContainerStatusesRequest request =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ GetContainerStatusesResponse response =
+ containerManager.getContainerStatuses(request);
+ if(response.getFailedRequests().containsKey(id)){
+ throw response.getFailedRequests().get(id).deSerialize();
+ }
+ } catch (Throwable e) {
throwsException = true;
}
Assert.assertTrue(throwsException);
@@ -162,7 +183,7 @@ public class TestContainerManager extend
fileWriter.close();
// ////// Construct the Container-id
- ContainerId cId = createContainerId();
+ ContainerId cId = createContainerId(0);
// ////// Construct the container-spec.
ContainerLaunchContext containerLaunchContext =
@@ -181,14 +202,17 @@ public class TestContainerManager extend
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
- StartContainerRequest startRequest =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(createContainerToken(cId,
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
- context.getContainerTokenSecretManager()));
- containerManager.startContainer(startRequest);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
@@ -236,7 +260,7 @@ public class TestContainerManager extend
new File(tmpDir, "start_file.txt").getAbsoluteFile();
// ////// Construct the Container-id
- ContainerId cId = createContainerId();
+ ContainerId cId = createContainerId(0);
if (Shell.WINDOWS) {
fileWriter.println("@echo Hello World!> " + processStartFile);
@@ -271,13 +295,17 @@ public class TestContainerManager extend
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(createContainerToken(cId,
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
- context.getContainerTokenSecretManager()));
- containerManager.startContainer(startRequest);
-
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
@@ -304,18 +332,18 @@ public class TestContainerManager extend
Assert.assertTrue("Process is not alive!",
DefaultContainerExecutor.containerIsAlive(pid));
- StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
- stopRequest.setContainerId(cId);
- containerManager.stopContainer(stopRequest);
-
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cId);
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ containerManager.stopContainers(stopRequest);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- GetContainerStatusRequest gcsRequest =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- gcsRequest.setContainerId(cId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
- containerManager.getContainerStatus(gcsRequest).getStatus();
+ containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode();
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
@@ -324,7 +352,7 @@ public class TestContainerManager extend
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
}
-
+
private void testContainerLaunchAndExit(int exitCode) throws IOException,
InterruptedException, YarnException {
@@ -334,7 +362,7 @@ public class TestContainerManager extend
new File(tmpDir, "start_file.txt").getAbsoluteFile();
// ////// Construct the Container-id
- ContainerId cId = createContainerId();
+ ContainerId cId = createContainerId(0);
if (Shell.WINDOWS) {
fileWriter.println("@echo Hello World!> " + processStartFile);
@@ -375,21 +403,26 @@ public class TestContainerManager extend
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(createContainerToken(cId,
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
- context.getContainerTokenSecretManager()));
- containerManager.startContainer(startRequest);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- GetContainerStatusRequest gcsRequest =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- gcsRequest.setContainerId(cId);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
- containerManager.getContainerStatus(gcsRequest).getStatus();
+ containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
// Verify exit status matches exit state of script
Assert.assertEquals(exitCode,
@@ -438,7 +471,7 @@ public class TestContainerManager extend
fileWriter.close();
// ////// Construct the Container-id
- ContainerId cId = createContainerId();
+ ContainerId cId = createContainerId(0);
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
// ////// Construct the container-spec.
@@ -459,11 +492,17 @@ public class TestContainerManager extend
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
- StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
- request.setContainerLaunchContext(containerLaunchContext);
- request.setContainerToken(createContainerToken(cId, DUMMY_RM_IDENTIFIER,
- context.getNodeId(), user, context.getContainerTokenSecretManager()));
- containerManager.startContainer(request);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
@@ -527,29 +566,37 @@ public class TestContainerManager extend
@Test
public void testContainerLaunchFromPreviousRM() throws IOException,
- InterruptedException {
+ InterruptedException, YarnException {
containerManager.start();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- ContainerId cId1 = createContainerId();
- ContainerId cId2 = createContainerId();
+ ContainerId cId1 = createContainerId(0);
+ ContainerId cId2 = createContainerId(0);
containerLaunchContext
.setLocalResources(new HashMap<String, LocalResource>());
// Construct the Container with Invalid RMIdentifier
StartContainerRequest startRequest1 =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest1.setContainerLaunchContext(containerLaunchContext);
+ StartContainerRequest.newInstance(containerLaunchContext,
+ createContainerToken(cId1,
+ ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(startRequest1);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
- startRequest1.setContainerToken(createContainerToken(cId1,
- ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
- user, context.getContainerTokenSecretManager()));
boolean catchException = false;
try {
- containerManager.startContainer(startRequest1);
- } catch (YarnException e) {
+ StartContainersResponse response = containerManager.startContainers(allRequests);
+ if(response.getFailedRequests().containsKey(cId1)) {
+ throw response.getFailedRequests().get(cId1).deSerialize();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
catchException = true;
Assert.assertTrue(e.getMessage().contains(
"Container " + cId1 + " rejected as it is allocated by a previous RM"));
@@ -562,21 +609,143 @@ public class TestContainerManager extend
// Construct the Container with a RMIdentifier within current RM
StartContainerRequest startRequest2 =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest2.setContainerLaunchContext(containerLaunchContext);
- startRequest2.setContainerToken(createContainerToken(cId2,
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
- context.getContainerTokenSecretManager()));
+ StartContainerRequest.newInstance(containerLaunchContext,
+ createContainerToken(cId2,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list2 = new ArrayList<StartContainerRequest>();
+ list.add(startRequest2);
+ StartContainersRequest allRequests2 =
+ StartContainersRequest.newInstance(list2);
+ containerManager.startContainers(allRequests2);
+
boolean noException = true;
try {
- containerManager.startContainer(startRequest2);
+ containerManager.startContainers(allRequests2);
} catch (YarnException e) {
noException = false;
}
// Verify that startContainer get no YarnException
Assert.assertTrue(noException);
}
-
+
+ @Test
+ public void testMultipleContainersLaunch() throws Exception {
+ containerManager.start();
+
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ for (int i = 0; i < 10; i++) {
+ ContainerId cId = createContainerId(i);
+ long identifier = 0;
+ if ((i & 1) == 0)
+ // container with even id fail
+ identifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
+ else
+ identifier = DUMMY_RM_IDENTIFIER;
+ Token containerToken =
+ createContainerToken(cId, identifier, context.getNodeId(), user,
+ context.getContainerTokenSecretManager());
+ StartContainerRequest request =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ list.add(request);
+ }
+ StartContainersRequest requestList =
+ StartContainersRequest.newInstance(list);
+
+ StartContainersResponse response =
+ containerManager.startContainers(requestList);
+
+ Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
+ for (ContainerId id : response.getSuccessfullyStartedContainers()) {
+ // Containers with odd id should succeed.
+ Assert.assertEquals(1, id.getId() & 1);
+ }
+ Assert.assertEquals(5, response.getFailedRequests().size());
+ for (Map.Entry<ContainerId, SerializedException> entry : response
+ .getFailedRequests().entrySet()) {
+ // Containers with even id should fail.
+ Assert.assertEquals(0, entry.getKey().getId() & 1);
+ Assert.assertTrue(entry.getValue().getMessage()
+ .contains(
+ "Container " + entry.getKey() + " rejected as it is allocated by a previous RM"));
+ }
+ }
+
+ @Test
+ public void testMultipleContainersStopAndGetStatus() throws Exception {
+ containerManager.start();
+ List<StartContainerRequest> startRequest =
+ new ArrayList<StartContainerRequest>();
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ for (int i = 0; i < 10; i++) {
+ ContainerId cId = createContainerId(i);
+ String user = null;
+ if ((i & 1) == 0) {
+ // container with even id fail
+ user = "Fail";
+ } else {
+ user = "Pass";
+ }
+ Token containerToken =
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager());
+ StartContainerRequest request =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ startRequest.add(request);
+ containerIds.add(cId);
+ }
+ // start containers
+ StartContainersRequest requestList =
+ StartContainersRequest.newInstance(startRequest);
+ containerManager.startContainers(requestList);
+
+ // Get container statuses
+ GetContainerStatusesRequest statusRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ GetContainerStatusesResponse statusResponse =
+ containerManager.getContainerStatuses(statusRequest);
+ Assert.assertEquals(5, statusResponse.getContainerStatuses().size());
+ for (ContainerStatus status : statusResponse.getContainerStatuses()) {
+ // Containers with odd id should succeed
+ Assert.assertEquals(1, status.getContainerId().getId() & 1);
+ }
+ Assert.assertEquals(5, statusResponse.getFailedRequests().size());
+ for (Map.Entry<ContainerId, SerializedException> entry : statusResponse
+ .getFailedRequests().entrySet()) {
+ // Containers with even id should fail.
+ Assert.assertEquals(0, entry.getKey().getId() & 1);
+ Assert.assertTrue(entry.getValue().getMessage()
+ .contains("Reject this container"));
+ }
+
+ // stop containers
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ StopContainersResponse stopResponse =
+ containerManager.stopContainers(stopRequest);
+ Assert.assertEquals(5, stopResponse.getSuccessfullyStoppedContainers()
+ .size());
+ for (ContainerId id : stopResponse.getSuccessfullyStoppedContainers()) {
+ // Containers with odd id should succeed.
+ Assert.assertEquals(1, id.getId() & 1);
+ }
+ Assert.assertEquals(5, stopResponse.getFailedRequests().size());
+ for (Map.Entry<ContainerId, SerializedException> entry : stopResponse
+ .getFailedRequests().entrySet()) {
+ // Containers with even id should fail.
+ Assert.assertEquals(0, entry.getKey().getId() & 1);
+ Assert.assertTrue(entry.getValue().getMessage()
+ .contains("Reject this container"));
+ }
+ }
+
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java Mon Aug 12 21:25:49 2013
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import junit.framework.Assert;
@@ -265,9 +266,12 @@ public class TestApplication {
AuxServicesEventType.APPLICATION_STOP, wa.appId)));
wa.appResourcesCleanedup();
- for ( Container container : wa.containers) {
+ for (Container container : wa.containers) {
+ ContainerTokenIdentifier identifier =
+ wa.getContainerTokenIdentifier(container.getContainerId());
+ waitForContainerTokenToExpire(identifier);
Assert.assertTrue(wa.context.getContainerTokenSecretManager()
- .isValidStartContainerRequest(container.getContainerId()));
+ .isValidStartContainerRequest(identifier));
}
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
@@ -277,6 +281,18 @@ public class TestApplication {
}
}
+ protected ContainerTokenIdentifier waitForContainerTokenToExpire(
+ ContainerTokenIdentifier identifier) {
+ int attempts = 5;
+ while (System.currentTimeMillis() < identifier.getExpiryTimeStamp()
+ && attempts-- > 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {}
+ }
+ return identifier;
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testAppFinishedOnCompletedContainers() {
@@ -306,8 +322,11 @@ public class TestApplication {
wa.appResourcesCleanedup();
for ( Container container : wa.containers) {
+ ContainerTokenIdentifier identifier =
+ wa.getContainerTokenIdentifier(container.getContainerId());
+ waitForContainerTokenToExpire(identifier);
Assert.assertTrue(wa.context.getContainerTokenSecretManager()
- .isValidStartContainerRequest(container.getContainerId()));
+ .isValidStartContainerRequest(identifier));
}
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally {
@@ -440,7 +459,8 @@ public class TestApplication {
final String user;
final List<Container> containers;
final Context context;
-
+ final Map<ContainerId, ContainerTokenIdentifier> containerTokenIdentifierMap;
+
final ApplicationId appId;
final Application app;
@@ -448,6 +468,8 @@ public class TestApplication {
Configuration conf = new Configuration();
dispatcher = new DrainDispatcher();
+ containerTokenIdentifierMap =
+ new HashMap<ContainerId, ContainerTokenIdentifier>();
dispatcher.init(conf);
localizerBus = mock(EventHandler.class);
@@ -486,11 +508,15 @@ public class TestApplication {
Container container = createMockedContainer(this.appId, i);
containers.add(container);
long currentTime = System.currentTimeMillis();
+ ContainerTokenIdentifier identifier =
+ new ContainerTokenIdentifier(container.getContainerId(), "", "",
+ null, currentTime + 2000, masterKey.getKeyId(), currentTime);
+ containerTokenIdentifierMap
+ .put(identifier.getContainerID(), identifier);
context.getContainerTokenSecretManager().startContainerSuccessful(
- new ContainerTokenIdentifier(container.getContainerId(), "",
- "", null, currentTime + 1000, masterKey.getKeyId(), currentTime));
+ identifier);
Assert.assertFalse(context.getContainerTokenSecretManager()
- .isValidStartContainerRequest(container.getContainerId()));
+ .isValidStartContainerRequest(identifier));
}
dispatcher.start();
@@ -542,6 +568,11 @@ public class TestApplication {
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
drainDispatcherEvents();
}
+
+ public ContainerTokenIdentifier getContainerTokenIdentifier(
+ ContainerId containerId) {
+ return this.containerTokenIdentifierMap.get(containerId);
+ }
}
private Container createMockedContainer(ApplicationId appId, int containerId) {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Mon Aug 12 21:25:49 2013
@@ -18,7 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@@ -40,11 +43,16 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -58,10 +66,13 @@ import org.apache.hadoop.yarn.event.Drai
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
@@ -124,6 +135,7 @@ public class TestContainer {
// all resources should be localized
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+ assertNotNull(wc.c.getLocalizedResources());
for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
.entrySet()) {
assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
@@ -161,6 +173,7 @@ public class TestContainer {
wc.containerKilledOnRequest();
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
}
finally {
@@ -183,6 +196,7 @@ public class TestContainer {
wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
}
finally {
@@ -205,7 +219,7 @@ public class TestContainer {
wc.containerSuccessful();
assertEquals(ContainerState.EXITED_WITH_SUCCESS,
wc.c.getContainerState());
-
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
}
finally {
@@ -228,10 +242,12 @@ public class TestContainer {
wc.containerSuccessful();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
// Now in DONE, issue INIT
wc.initContainer();
// Verify still in DONE
assertEquals(ContainerState.DONE, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
}
finally {
@@ -240,6 +256,34 @@ public class TestContainer {
}
}
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ // mocked generic
+ public void testLocalizationFailureAtDone() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(6, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ reset(wc.localizerBus);
+ wc.containerSuccessful();
+ wc.containerResourcesCleanup();
+ assertEquals(ContainerState.DONE, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
+ // Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
+ wc.resourceFailedContainer();
+ // Verify still in DONE
+ assertEquals(ContainerState.DONE, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
@Test
@SuppressWarnings("unchecked") // mocked generic
@@ -252,7 +296,9 @@ public class TestContainer {
wc.launchContainer();
reset(wc.localizerBus);
wc.killContainer();
- assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
wc.containerKilledOnRequest();
verifyCleanupCall(wc);
@@ -271,8 +317,30 @@ public class TestContainer {
wc.initContainer();
wc.failLocalizeResources(wc.getLocalResourceCount());
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
wc.killContainer();
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ public void testKillOnLocalized() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+ wc.killContainer();
+ assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
@@ -293,8 +361,10 @@ public class TestContainer {
}
wc.failLocalizeResources(failCount);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
wc.localizeResourcesFromInvalidState(failCount);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
Assert.assertTrue(wc.getDiagnostics().contains(FAKE_LOCALIZATION_ERROR));
} finally {
@@ -316,8 +386,10 @@ public class TestContainer {
String key2 = lRsrcKeys.next();
wc.failLocalizeSpecificResource(key1);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
wc.failLocalizeSpecificResource(key2);
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
@@ -337,8 +409,10 @@ public class TestContainer {
String key1 = lRsrcKeys.next();
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
wc.failLocalizeSpecificResource(key1);
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
@@ -398,9 +472,13 @@ public class TestContainer {
wc.initContainer();
wc.localizeResources();
wc.killContainer();
- assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
wc.launchContainer();
- assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ wc.c.getContainerState());
+ assertNull(wc.c.getLocalizedResources());
wc.containerKilledOnRequest();
verifyCleanupCall(wc);
} finally {
@@ -537,6 +615,7 @@ public class TestContainer {
final EventHandler<AuxServicesEvent> auxBus;
final EventHandler<ApplicationEvent> appBus;
final EventHandler<LogHandlerEvent> LogBus;
+ final ContainersLauncher launcher;
final ContainerLaunchContext ctxt;
final ContainerId cId;
@@ -549,6 +628,7 @@ public class TestContainer {
this(appId, timestamp, id, user, true, false);
}
+ @SuppressWarnings("rawtypes")
WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) throws IOException {
dispatcher = new DrainDispatcher();
@@ -567,6 +647,22 @@ public class TestContainer {
dispatcher.register(ApplicationEventType.class, appBus);
dispatcher.register(LogHandlerEventType.class, LogBus);
+ Context context = mock(Context.class);
+ when(context.getApplications()).thenReturn(
+ new ConcurrentHashMap<ApplicationId, Application>());
+ launcher = new ContainersLauncher(context, dispatcher, null, null);
+ // create a mock ExecutorService, which will not really launch
+ // ContainerLaunch at all.
+ launcher.containerLauncher = mock(ExecutorService.class);
+ Future future = mock(Future.class);
+ when(launcher.containerLauncher.submit
+ (any(Callable.class))).thenReturn(future);
+ when(future.isDone()).thenReturn(false);
+ when(future.cancel(false)).thenReturn(true);
+ launcher.init(new Configuration());
+ launcher.start();
+ dispatcher.register(ContainersLauncherEventType.class, launcher);
+
ctxt = mock(ContainerLaunchContext.class);
org.apache.hadoop.yarn.api.records.Container mockContainer =
mock(org.apache.hadoop.yarn.api.records.Container.class);
@@ -608,6 +704,13 @@ public class TestContainer {
when(ctxt.getServiceData()).thenReturn(serviceData);
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
+ dispatcher.register(ContainerEventType.class,
+ new EventHandler<ContainerEvent>() {
+ @Override
+ public void handle(ContainerEvent event) {
+ c.handle(event);
+ }
+ });
dispatcher.start();
}
@@ -624,6 +727,11 @@ public class TestContainer {
drainDispatcherEvents();
}
+ public void resourceFailedContainer() {
+ c.handle(new ContainerEvent(cId, ContainerEventType.RESOURCE_FAILED));
+ drainDispatcherEvents();
+ }
+
// Localize resources
// Skip some resources so as to consider them failed
public Map<Path, List<String>> doLocalizeResources(