You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC

svn commit: r1513258 [6/10] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-...

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java Mon Aug 12 21:25:49 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
@@ -31,15 +33,16 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.NMNotYetReadyException;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.junit.After;
@@ -282,15 +285,18 @@ public class TestNodeManagerResync {
         try {
           while (!isStopped && numContainers < 10) {
             ContainerId cId = TestNodeManagerShutdown.createContainerId();
-            StartContainerRequest startRequest =
-                recordFactory.newRecordInstance(StartContainerRequest.class);
-            startRequest.setContainerLaunchContext(containerLaunchContext);
-            startRequest.setContainerToken(null);
+            StartContainerRequest scRequest =
+                StartContainerRequest.newInstance(containerLaunchContext,
+                  null);
+            List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+            list.add(scRequest);
+            StartContainersRequest allRequests =
+                StartContainersRequest.newInstance(list);
             System.out.println("no. of containers to be launched: "
                 + numContainers);
             numContainers++;
             try {
-              getContainerManager().startContainer(startRequest);
+              getContainerManager().startContainers(allRequests);
             } catch (YarnException e) {
               numContainersRejected++;
               Assert.assertTrue(e.getMessage().contains(

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Mon Aug 12 21:25:49 2013
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -40,8 +41,9 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,7 +55,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -181,12 +182,6 @@ public class TestNodeManagerShutdown {
     containerLaunchContext.setLocalResources(localResources);
     List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
     containerLaunchContext.setCommands(commands);
-    StartContainerRequest startRequest =
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
-    startRequest
-      .setContainerToken(TestContainerManager.createContainerToken(cId, 0,
-        nodeId, user, nm.getNMContext().getContainerTokenSecretManager()));
     final InetSocketAddress containerManagerBindAddress =
         NetUtils.createSocketAddrForHost("127.0.0.1", 12345);
     UserGroupInformation currentUser = UserGroupInformation
@@ -210,13 +205,22 @@ public class TestNodeManagerShutdown {
               containerManagerBindAddress, conf);
           }
         });
-    containerManager.startContainer(startRequest);
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          TestContainerManager.createContainerToken(cId, 0,
+            nodeId, user, nm.getNMContext().getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
     
-    GetContainerStatusRequest request =
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-        request.setContainerId(cId);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cId);
+    GetContainerStatusesRequest request =
+        GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus =
-        containerManager.getContainerStatus(request).getStatus();
+        containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
     Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
   }
   

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Aug 12 21:25:49 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.no
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -41,10 +43,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,6 +57,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -60,7 +65,6 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -95,24 +99,40 @@ public class TestNodeStatusUpdater {
   }
 
   static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
-  static final Path basedir =
-      new Path("target", TestNodeStatusUpdater.class.getName());
+  static final File basedir =
+      new File("target", TestNodeStatusUpdater.class.getName());
+  static final File nmLocalDir = new File(basedir, "nm0");
+  static final File tmpDir = new File(basedir, "tmpDir");
+  static final File remoteLogsDir = new File(basedir, "remotelogs");
+  static final File logsDir = new File(basedir, "logs");
   private static final RecordFactory recordFactory = RecordFactoryProvider
       .getRecordFactory(null);
 
   volatile int heartBeatID = 0;
   volatile Throwable nmStartError = null;
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
-  private final Configuration conf = createNMConfig();
+  private boolean triggered = false;
+  private Configuration conf;
   private NodeManager nm;
   private boolean containerStatusBackupSuccessfully = true;
   private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
+  private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+
+  @Before
+  public void setUp() {
+    nmLocalDir.mkdirs();
+    tmpDir.mkdirs();
+    logsDir.mkdirs();
+    remoteLogsDir.mkdirs();
+    conf = createNMConfig();
+  }
 
   @After
   public void tearDown() {
     this.registeredNodes.clear();
     heartBeatID = 0;
     ServiceOperations.stop(nm);
+    assertionFailedInThread.set(false);
     DefaultMetricsSystem.shutdown();
   }
 
@@ -274,6 +294,11 @@ public class TestNodeStatusUpdater {
     protected ResourceTracker getRMClient() {
       return resourceTracker;
     }
+
+    @Override
+    protected void stopRMProxy() {
+      return;
+    }
   }
 
   private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
@@ -290,6 +315,10 @@ public class TestNodeStatusUpdater {
       return resourceTracker;
     }
 
+    @Override
+    protected void stopRMProxy() {
+      return;
+    }
   }
 
   private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
@@ -307,7 +336,12 @@ public class TestNodeStatusUpdater {
     protected ResourceTracker getRMClient() {
       return resourceTracker;
     }
-    
+
+    @Override
+    protected void stopRMProxy() {
+      return;
+    }
+
     @Override
     protected boolean isTokenKeepAliveEnabled(Configuration conf) {
       return true;
@@ -315,21 +349,16 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
-    public ResourceTracker resourceTracker =
-        new MyResourceTracker(this.context);
+
     private Context context;
-    private long waitStartTime;
     private final long rmStartIntervalMS;
     private final boolean rmNeverStart;
-    private volatile boolean triggered = false;
-    private long durationWhenTriggered = -1;
-
+    public ResourceTracker resourceTracker;
     public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
         long rmStartIntervalMS, boolean rmNeverStart) {
       super(context, dispatcher, healthChecker, metrics);
       this.context = context;
-      this.waitStartTime = System.currentTimeMillis();
       this.rmStartIntervalMS = rmStartIntervalMS;
       this.rmNeverStart = rmNeverStart;
     }
@@ -337,25 +366,16 @@ public class TestNodeStatusUpdater {
     @Override
     protected void serviceStart() throws Exception {
       //record the startup time
-      this.waitStartTime = System.currentTimeMillis();
       super.serviceStart();
     }
 
     @Override
-    protected ResourceTracker getRMClient() {
-      if (!triggered) {
-        long t = System.currentTimeMillis();
-        long duration = t - waitStartTime;
-        if (duration <= rmStartIntervalMS
-            || rmNeverStart) {
-          throw new YarnRuntimeException("Faking RM start failure as start " +
-                                  "delay timer has not expired.");
-        } else {
-          //triggering
-          triggered = true;
-          durationWhenTriggered = duration;
-        }
-      }
+    protected ResourceTracker getRMClient() throws IOException {
+      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+      resourceTracker =
+          (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+            new MyResourceTracker6(this.context, rmStartIntervalMS,
+              rmNeverStart), retryPolicy);
       return resourceTracker;
     }
 
@@ -363,37 +383,35 @@ public class TestNodeStatusUpdater {
       return triggered;
     }
 
-    private long getWaitStartTime() {
-      return waitStartTime;
-    }
-
-    private long getDurationWhenTriggered() {
-      return durationWhenTriggered;
-    }
-
     @Override
-    public String toString() {
-      return "MyNodeStatusUpdater4{" +
-             "rmNeverStart=" + rmNeverStart +
-             ", triggered=" + triggered +
-             ", duration=" + durationWhenTriggered +
-             ", rmStartIntervalMS=" + rmStartIntervalMS +
-             '}';
+    protected void stopRMProxy() {
+      return;
     }
   }
 
+
+
   private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
     private ResourceTracker resourceTracker;
+    private Configuration conf;
 
     public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
-        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) {
       super(context, dispatcher, healthChecker, metrics);
       resourceTracker = new MyResourceTracker5();
+      this.conf = conf;
     }
 
     @Override
     protected ResourceTracker getRMClient() {
-      return resourceTracker;
+      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+      return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
+        resourceTracker, retryPolicy);
+    }
+
+    @Override
+    protected void stopRMProxy() {
+      return;
     }
   }
 
@@ -408,7 +426,7 @@ public class TestNodeStatusUpdater {
       return this.nodeStatusUpdater;
     }
 
-    protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
+    public MyNodeStatusUpdater3 getNodeStatusUpdater() {
       return this.nodeStatusUpdater;
     }
   }
@@ -417,15 +435,18 @@ public class TestNodeStatusUpdater {
     public boolean isStopped = false;
     private NodeStatusUpdater nodeStatusUpdater;
     private CyclicBarrier syncBarrier;
-    public MyNodeManager2 (CyclicBarrier syncBarrier) {
+    private Configuration conf;
+
+    public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) {
       this.syncBarrier = syncBarrier;
+      this.conf = conf;
     }
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
       nodeStatusUpdater =
           new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
-                                     metrics);
+                                     metrics, conf);
       return nodeStatusUpdater;
     }
 
@@ -433,6 +454,13 @@ public class TestNodeStatusUpdater {
     protected void serviceStop() throws Exception {
       super.serviceStop();
       isStopped = true;
+      ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
+      .containermanager.container.Container> containers =
+          getNMContext().getContainers();
+      // ensure that containers are empty
+      if(!containers.isEmpty()) {
+        assertionFailedInThread.set(true);
+      }
       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
     }
   }
@@ -577,7 +605,7 @@ public class TestNodeStatusUpdater {
               .get(4).getState() == ContainerState.RUNNING
               && request.getNodeStatus().getContainersStatuses().get(4)
                   .getContainerId().getId() == 5);
-          throw new YarnRuntimeException("Lost the heartbeat response");
+          throw new java.net.ConnectException("Lost the heartbeat response");
         } else if (heartBeatID == 2) {
           Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
               .size(), 7);
@@ -646,7 +674,63 @@ public class TestNodeStatusUpdater {
     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
         throws YarnException, IOException {
       heartBeatID++;
-      throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+      throw new java.net.ConnectException(
+          "NodeHeartbeat exception");
+    }
+  }
+
+  private class MyResourceTracker6 implements ResourceTracker {
+
+    private final Context context;
+    private long rmStartIntervalMS;
+    private boolean rmNeverStart;
+    private final long waitStartTime;
+
+    public MyResourceTracker6(Context context, long rmStartIntervalMS,
+        boolean rmNeverStart) {
+      this.context = context;
+      this.rmStartIntervalMS = rmStartIntervalMS;
+      this.rmNeverStart = rmNeverStart;
+      this.waitStartTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnException, IOException,
+        IOException {
+      if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
+          || rmNeverStart) {
+        throw new java.net.ConnectException("Faking RM start failure as start "
+            + "delay timer has not expired.");
+      } else {
+        NodeId nodeId = request.getNodeId();
+        Resource resource = request.getResource();
+        LOG.info("Registering " + nodeId.toString());
+        // NOTE: this really should be checking against the config value
+        InetSocketAddress expected = NetUtils.getConnectAddress(
+            conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
+        Assert.assertEquals(NetUtils.getHostPortString(expected),
+            nodeId.toString());
+        Assert.assertEquals(5 * 1024, resource.getMemory());
+        registeredNodes.add(nodeId);
+
+        RegisterNodeManagerResponse response = recordFactory
+            .newRecordInstance(RegisterNodeManagerResponse.class);
+        triggered = true;
+        return response;
+      }
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnException, IOException {
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID++);
+
+      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+          newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null,
+              null, null, null, 1000L);
+      return nhResponse;
     }
   }
 
@@ -658,9 +742,43 @@ public class TestNodeStatusUpdater {
   @After
   public void deleteBaseDir() throws IOException {
     FileContext lfs = FileContext.getLocalFSFileContext();
-    lfs.delete(basedir, true);
+    lfs.delete(new Path(basedir.getPath()), true);
   }
 
+  @Test(timeout = 90000)                                                      
+  public void testRecentlyFinishedContainers() throws Exception {             
+    NodeManager nm = new NodeManager();                                       
+    YarnConfiguration conf = new YarnConfiguration();                         
+    conf.set(                                                                 
+        NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+        "10000");                                                             
+    nm.init(conf);                                                            
+    NodeStatusUpdaterImpl nodeStatusUpdater =                                 
+        (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();                    
+    ApplicationId appId = ApplicationId.newInstance(0, 0);                    
+    ApplicationAttemptId appAttemptId =                                       
+        ApplicationAttemptId.newInstance(appId, 0);                           
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 0);               
+                                                                              
+                                                                              
+    nodeStatusUpdater.addStoppedContainersToCache(cId);                      
+    Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));     
+                                                                              
+    long time1 = System.currentTimeMillis();                                  
+    int waitInterval = 15;                                                    
+    while (waitInterval-- > 0                                                 
+        && nodeStatusUpdater.isContainerRecentlyStopped(cId)) {               
+      nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();          
+      Thread.sleep(1000);                                                     
+    }                                                                         
+    long time2 = System.currentTimeMillis();                                  
+    // By this time the container will be removed from cache. need to verify.
+    Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));    
+    Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000);  
+  }                                                                           
+                                                                              
+
+  
   @Test
   public void testNMRegistration() throws InterruptedException {
     nm = new NodeManager() {
@@ -843,8 +961,7 @@ public class TestNodeStatusUpdater {
     final long connectionRetryIntervalSecs = 1;
     //Waiting for rmStartIntervalMS, RM will be started
     final long rmStartIntervalMS = 2*1000;
-    YarnConfiguration conf = createNMConfig();
-    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
         connectionWaitSecs);
     conf.setLong(YarnConfiguration
         .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
@@ -907,8 +1024,6 @@ public class TestNodeStatusUpdater {
     }
     long duration = System.currentTimeMillis() - waitStartTime;
     MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
-    Assert.assertTrue("Updater was never started",
-                      myUpdater.getWaitStartTime()>0);
     Assert.assertTrue("NM started before updater triggered",
                       myUpdater.isTriggered());
     Assert.assertTrue("NM should have connected to RM after "
@@ -1033,23 +1148,32 @@ public class TestNodeStatusUpdater {
 
   @Test(timeout = 200000)
   public void testNodeStatusUpdaterRetryAndNMShutdown() 
-      throws InterruptedException {
+      throws Exception {
     final long connectionWaitSecs = 1;
     final long connectionRetryIntervalSecs = 1;
     YarnConfiguration conf = createNMConfig();
-    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_SECS,
         connectionWaitSecs);
     conf.setLong(YarnConfiguration
         .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
         connectionRetryIntervalSecs);
+    conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
-    nm = new MyNodeManager2(syncBarrier);
+    nm = new MyNodeManager2(syncBarrier, conf);
     nm.init(conf);
     nm.start();
+    // start a container
+    ContainerId cId = TestNodeManagerShutdown.createContainerId();
+    FileContext localFS = FileContext.getLocalFSFileContext();
+    TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir,
+      new File("start_file.txt"));
+
     try {
       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
     } catch (Exception e) {
     }
+    Assert.assertFalse("Containers not cleaned up when NM stopped",
+      assertionFailedInThread.get());
     Assert.assertTrue(((MyNodeManager2) nm).isStopped);
     Assert.assertTrue("calculate heartBeatCount based on" +
         " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
@@ -1167,15 +1291,13 @@ public class TestNodeStatusUpdater {
 
   private YarnConfiguration createNMConfig() {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
     conf.set(YarnConfiguration.NM_ADDRESS, "localhost:12345");
     conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "localhost:12346");
-    conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
-        .getPath());
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
-        "remotelogs").toUri().getPath());
-    conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
-        .toUri().getPath());
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+      remoteLogsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
     return conf;
   }
   

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Mon Aug 12 21:25:49 2013
@@ -21,16 +21,21 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 
+import junit.framework.Assert;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
@@ -42,7 +47,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
-import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 
@@ -80,7 +84,7 @@ public class TestPBRecordImpl {
     e.setStackTrace(new StackTraceElement[] {
         new StackTraceElement("foo", "bar", "baz", 10),
         new StackTraceElement("sbb", "one", "onm", 10) });
-    ret.setException(YarnServerBuilderUtils.newSerializedException(e));
+    ret.setException(SerializedException.newInstance(e));
     return ret;
   }
 
@@ -176,4 +180,33 @@ public class TestPBRecordImpl {
     assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
   }
 
+
+  @Test(timeout=10000)
+  public void testSerializedExceptionDeSer() throws Exception{
+    // without cause
+    YarnException yarnEx = new YarnException("Yarn_Exception");
+    SerializedException serEx = SerializedException.newInstance(yarnEx);
+    Throwable throwable = serEx.deSerialize();
+    Assert.assertEquals(yarnEx.getClass(), throwable.getClass());
+    Assert.assertEquals(yarnEx.getMessage(), throwable.getMessage());
+
+    // with cause
+    IOException ioe = new IOException("Test_IOException");
+    RuntimeException runtimeException =
+        new RuntimeException("Test_RuntimeException", ioe);
+    YarnException yarnEx2 =
+        new YarnException("Test_YarnException", runtimeException);
+
+    SerializedException serEx2 = SerializedException.newInstance(yarnEx2);
+    Throwable throwable2 = serEx2.deSerialize();
+    throwable2.printStackTrace();
+    Assert.assertEquals(yarnEx2.getClass(), throwable2.getClass());
+    Assert.assertEquals(yarnEx2.getMessage(), throwable2.getMessage());
+
+    Assert.assertEquals(runtimeException.getClass(), throwable2.getCause().getClass());
+    Assert.assertEquals(runtimeException.getMessage(), throwable2.getCause().getMessage());
+
+    Assert.assertEquals(ioe.getClass(), throwable2.getCause().getCause().getClass());
+    Assert.assertEquals(ioe.getMessage(), throwable2.getCause().getCause().getMessage());
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Mon Aug 12 21:25:49 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import junit.framework.Assert;
 
@@ -32,7 +34,7 @@ import org.apache.hadoop.fs.UnsupportedF
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -118,6 +120,11 @@ public abstract class BaseContainerManag
     };
 
     @Override
+    protected void stopRMProxy() {
+      return;
+    }
+
+    @Override
     protected void startStatusUpdater() {
       return; // Don't start any updating thread.
     }
@@ -184,15 +191,18 @@ public abstract class BaseContainerManag
 
       @Override
         protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
-            Container container, boolean stopRequest) throws YarnException {
+            Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
           // do nothing
         }
-      
+      @Override
+      protected void authorizeUser(UserGroupInformation remoteUgi,
+          NMTokenIdentifier nmTokenIdentifier) {
+        // do nothing
+      }
       @Override
         protected void authorizeStartRequest(
             NMTokenIdentifier nmTokenIdentifier,
-            ContainerTokenIdentifier containerTokenIdentifier,
-            UserGroupInformation ugi) throws YarnException {
+            ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
           // do nothing
         }
       
@@ -233,18 +243,20 @@ public abstract class BaseContainerManag
   public static void waitForContainerState(ContainerManagementProtocol containerManager,
           ContainerId containerID, ContainerState finalState, int timeOutMax)
           throws InterruptedException, YarnException, IOException {
-    GetContainerStatusRequest request =
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-        request.setContainerId(containerID);
-        ContainerStatus containerStatus =
-            containerManager.getContainerStatus(request).getStatus();
-        int timeoutSecs = 0;
+    List<ContainerId> list = new ArrayList<ContainerId>();
+    list.add(containerID);
+    GetContainerStatusesRequest request =
+        GetContainerStatusesRequest.newInstance(list);
+    ContainerStatus containerStatus =
+        containerManager.getContainerStatuses(request).getContainerStatuses()
+          .get(0);
+    int timeoutSecs = 0;
       while (!containerStatus.getState().equals(finalState)
           && timeoutSecs++ < timeOutMax) {
           Thread.sleep(1000);
           LOG.info("Waiting for container to get into state " + finalState
               + ". Current state is " + containerStatus.getState());
-          containerStatus = containerManager.getContainerStatus(request).getStatus();
+          containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
         }
         LOG.info("Container state is " + containerStatus.getState());
         Assert.assertEquals("ContainerState is not correct (timedout)",

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Mon Aug 12 21:25:49 2013
@@ -24,6 +24,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -37,9 +38,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -51,8 +56,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
@@ -62,6 +69,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@@ -87,11 +95,11 @@ public class TestContainerManager extend
     super.setup();
   }
 
-  private ContainerId createContainerId() {
+  private ContainerId createContainerId(int id) {
     ApplicationId appId = ApplicationId.newInstance(0, 0);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 1);
-    ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
     return containerId;
   }
   
@@ -118,6 +126,14 @@ public class TestContainerManager extend
           .getKeyId()));
         return ugi;
       }
+
+      @Override
+      protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+          Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
+        if(container == null || container.getUser().equals("Fail")){
+          throw new YarnException("Reject this container");
+        }
+      }
     };
   }
   
@@ -137,12 +153,17 @@ public class TestContainerManager extend
     // Just do a query for a non-existing container.
     boolean throwsException = false;
     try {
-      GetContainerStatusRequest request = 
-          recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-      ContainerId cId = createContainerId();
-      request.setContainerId(cId);
-      containerManager.getContainerStatus(request);
-    } catch (YarnException e) {
+      List<ContainerId> containerIds = new ArrayList<ContainerId>();
+      ContainerId id =createContainerId(0);
+      containerIds.add(id);
+      GetContainerStatusesRequest request =
+          GetContainerStatusesRequest.newInstance(containerIds);
+      GetContainerStatusesResponse response =
+          containerManager.getContainerStatuses(request);
+      if(response.getFailedRequests().containsKey(id)){
+        throw response.getFailedRequests().get(id).deSerialize();
+      }
+    } catch (Throwable e) {
       throwsException = true;
     }
     Assert.assertTrue(throwsException);
@@ -162,7 +183,7 @@ public class TestContainerManager extend
     fileWriter.close();
 
     // ////// Construct the Container-id
-    ContainerId cId = createContainerId();
+    ContainerId cId = createContainerId(0);
 
     // ////// Construct the container-spec.
     ContainerLaunchContext containerLaunchContext = 
@@ -181,14 +202,17 @@ public class TestContainerManager extend
         new HashMap<String, LocalResource>();
     localResources.put(destinationFile, rsrc_alpha);
     containerLaunchContext.setLocalResources(localResources);
-    StartContainerRequest startRequest = 
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
-    startRequest.setContainerToken(createContainerToken(cId,
-      DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
-      context.getContainerTokenSecretManager()));
 
-    containerManager.startContainer(startRequest);
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(
+          containerLaunchContext,
+          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+            user, context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
@@ -236,7 +260,7 @@ public class TestContainerManager extend
         new File(tmpDir, "start_file.txt").getAbsoluteFile();
 
     // ////// Construct the Container-id
-    ContainerId cId = createContainerId();
+    ContainerId cId = createContainerId(0);
 
     if (Shell.WINDOWS) {
       fileWriter.println("@echo Hello World!> " + processStartFile);
@@ -271,13 +295,17 @@ public class TestContainerManager extend
     List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
     containerLaunchContext.setCommands(commands);
 
-    StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
-    startRequest.setContainerToken(createContainerToken(cId,
-      DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
-      context.getContainerTokenSecretManager()));
-    containerManager.startContainer(startRequest);
- 
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          createContainerToken(cId,
+            DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+            context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
     int timeoutSecs = 0;
     while (!processStartFile.exists() && timeoutSecs++ < 20) {
       Thread.sleep(1000);
@@ -304,18 +332,18 @@ public class TestContainerManager extend
     Assert.assertTrue("Process is not alive!",
       DefaultContainerExecutor.containerIsAlive(pid));
 
-    StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
-    stopRequest.setContainerId(cId);
-    containerManager.stopContainer(stopRequest);
-
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cId);
+    StopContainersRequest stopRequest =
+        StopContainersRequest.newInstance(containerIds);
+    containerManager.stopContainers(stopRequest);
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
     
-    GetContainerStatusRequest gcsRequest = 
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-    gcsRequest.setContainerId(cId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus = 
-        containerManager.getContainerStatus(gcsRequest).getStatus();
+        containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
     int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
       ExitCode.TERMINATED.getExitCode();
     Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
@@ -324,7 +352,7 @@ public class TestContainerManager extend
     Assert.assertFalse("Process is still alive!",
       DefaultContainerExecutor.containerIsAlive(pid));
   }
-  
+
   private void testContainerLaunchAndExit(int exitCode) throws IOException,
       InterruptedException, YarnException {
 
@@ -334,7 +362,7 @@ public class TestContainerManager extend
 			  new File(tmpDir, "start_file.txt").getAbsoluteFile();
 
 	  // ////// Construct the Container-id
-	  ContainerId cId = createContainerId();
+	  ContainerId cId = createContainerId(0);
 
 	  if (Shell.WINDOWS) {
 	    fileWriter.println("@echo Hello World!> " + processStartFile);
@@ -375,21 +403,26 @@ public class TestContainerManager extend
 	  List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
 	  containerLaunchContext.setCommands(commands);
 
-	  StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
-	  startRequest.setContainerLaunchContext(containerLaunchContext);
-    startRequest.setContainerToken(createContainerToken(cId,
-      DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
-      context.getContainerTokenSecretManager()));
-	  containerManager.startContainer(startRequest);
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(
+          containerLaunchContext,
+          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+            user, context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
 
 	  BaseContainerManagerTest.waitForContainerState(containerManager, cId,
 			  ContainerState.COMPLETE);
 
-	  GetContainerStatusRequest gcsRequest = 
-			  recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-	  gcsRequest.setContainerId(cId);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
 	  ContainerStatus containerStatus = 
-			  containerManager.getContainerStatus(gcsRequest).getStatus();
+			  containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
 
 	  // Verify exit status matches exit state of script
 	  Assert.assertEquals(exitCode,
@@ -438,7 +471,7 @@ public class TestContainerManager extend
     fileWriter.close();
 
     // ////// Construct the Container-id
-    ContainerId cId = createContainerId();
+    ContainerId cId = createContainerId(0);
     ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
 
     // ////// Construct the container-spec.
@@ -459,11 +492,17 @@ public class TestContainerManager extend
         new HashMap<String, LocalResource>();
     localResources.put(destinationFile, rsrc_alpha);
     containerLaunchContext.setLocalResources(localResources);
-    StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
-    request.setContainerLaunchContext(containerLaunchContext);
-    request.setContainerToken(createContainerToken(cId, DUMMY_RM_IDENTIFIER,
-      context.getNodeId(), user, context.getContainerTokenSecretManager()));
-    containerManager.startContainer(request);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(
+          containerLaunchContext,
+          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+            user, context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
@@ -527,29 +566,37 @@ public class TestContainerManager extend
 
   @Test
   public void testContainerLaunchFromPreviousRM() throws IOException,
-      InterruptedException {
+      InterruptedException, YarnException {
     containerManager.start();
 
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
 
-    ContainerId cId1 = createContainerId();
-    ContainerId cId2 = createContainerId();
+    ContainerId cId1 = createContainerId(0);
+    ContainerId cId2 = createContainerId(0);
     containerLaunchContext
       .setLocalResources(new HashMap<String, LocalResource>());
 
     // Construct the Container with Invalid RMIdentifier
     StartContainerRequest startRequest1 =
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest1.setContainerLaunchContext(containerLaunchContext);
+        StartContainerRequest.newInstance(containerLaunchContext,
+          createContainerToken(cId1,
+            ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
+            user, context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(startRequest1);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
     
-    startRequest1.setContainerToken(createContainerToken(cId1,
-      ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
-      user, context.getContainerTokenSecretManager()));
     boolean catchException = false;
     try {
-      containerManager.startContainer(startRequest1);
-    } catch (YarnException e) {
+      StartContainersResponse response = containerManager.startContainers(allRequests);
+      if(response.getFailedRequests().containsKey(cId1)) {
+        throw response.getFailedRequests().get(cId1).deSerialize();
+      }
+    } catch (Throwable e) {
+      e.printStackTrace();
       catchException = true;
       Assert.assertTrue(e.getMessage().contains(
         "Container " + cId1 + " rejected as it is allocated by a previous RM"));
@@ -562,21 +609,143 @@ public class TestContainerManager extend
 
     // Construct the Container with a RMIdentifier within current RM
     StartContainerRequest startRequest2 =
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest2.setContainerLaunchContext(containerLaunchContext);
-    startRequest2.setContainerToken(createContainerToken(cId2,
-      DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
-      context.getContainerTokenSecretManager()));
+        StartContainerRequest.newInstance(containerLaunchContext,
+          createContainerToken(cId2,
+            DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+            context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list2 = new ArrayList<StartContainerRequest>();
+    list.add(startRequest2);
+    StartContainersRequest allRequests2 =
+        StartContainersRequest.newInstance(list2);
+    containerManager.startContainers(allRequests2);
+    
     boolean noException = true;
     try {
-      containerManager.startContainer(startRequest2);
+      containerManager.startContainers(allRequests2);
     } catch (YarnException e) {
       noException = false;
     }
     // Verify that startContainer get no YarnException
     Assert.assertTrue(noException);
   }
-  
+
+  @Test
+  public void testMultipleContainersLaunch() throws Exception {
+    containerManager.start();
+
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    for (int i = 0; i < 10; i++) {
+      ContainerId cId = createContainerId(i);
+      long identifier = 0;
+      if ((i & 1) == 0)
+        // container with even id fail
+        identifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
+      else
+        identifier = DUMMY_RM_IDENTIFIER;
+      Token containerToken =
+          createContainerToken(cId, identifier, context.getNodeId(), user,
+            context.getContainerTokenSecretManager());
+      StartContainerRequest request =
+          StartContainerRequest.newInstance(containerLaunchContext,
+            containerToken);
+      list.add(request);
+    }
+    StartContainersRequest requestList =
+        StartContainersRequest.newInstance(list);
+
+    StartContainersResponse response =
+        containerManager.startContainers(requestList);
+
+    Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
+    for (ContainerId id : response.getSuccessfullyStartedContainers()) {
+      // Containers with odd id should succeed.
+      Assert.assertEquals(1, id.getId() & 1);
+    }
+    Assert.assertEquals(5, response.getFailedRequests().size());
+    for (Map.Entry<ContainerId, SerializedException> entry : response
+      .getFailedRequests().entrySet()) {
+      // Containers with even id should fail.
+      Assert.assertEquals(0, entry.getKey().getId() & 1);
+      Assert.assertTrue(entry.getValue().getMessage()
+        .contains(
+          "Container " + entry.getKey() + " rejected as it is allocated by a previous RM"));
+    }
+  }
+
+  @Test
+  public void testMultipleContainersStopAndGetStatus() throws Exception {
+    containerManager.start();
+    List<StartContainerRequest> startRequest =
+        new ArrayList<StartContainerRequest>();
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    for (int i = 0; i < 10; i++) {
+      ContainerId cId = createContainerId(i);
+      String user = null;
+      if ((i & 1) == 0) {
+        // container with even id fail
+        user = "Fail";
+      } else {
+        user = "Pass";
+      }
+      Token containerToken =
+          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+            user, context.getContainerTokenSecretManager());
+      StartContainerRequest request =
+          StartContainerRequest.newInstance(containerLaunchContext,
+            containerToken);
+      startRequest.add(request);
+      containerIds.add(cId);
+    }
+    // start containers
+    StartContainersRequest requestList =
+        StartContainersRequest.newInstance(startRequest);
+    containerManager.startContainers(requestList);
+
+    // Get container statuses
+    GetContainerStatusesRequest statusRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
+    GetContainerStatusesResponse statusResponse =
+        containerManager.getContainerStatuses(statusRequest);
+    Assert.assertEquals(5, statusResponse.getContainerStatuses().size());
+    for (ContainerStatus status : statusResponse.getContainerStatuses()) {
+      // Containers with odd id should succeed
+      Assert.assertEquals(1, status.getContainerId().getId() & 1);
+    }
+    Assert.assertEquals(5, statusResponse.getFailedRequests().size());
+    for (Map.Entry<ContainerId, SerializedException> entry : statusResponse
+      .getFailedRequests().entrySet()) {
+      // Containers with even id should fail.
+      Assert.assertEquals(0, entry.getKey().getId() & 1);
+      Assert.assertTrue(entry.getValue().getMessage()
+        .contains("Reject this container"));
+    }
+
+    // stop containers
+    StopContainersRequest stopRequest =
+        StopContainersRequest.newInstance(containerIds);
+    StopContainersResponse stopResponse =
+        containerManager.stopContainers(stopRequest);
+    Assert.assertEquals(5, stopResponse.getSuccessfullyStoppedContainers()
+      .size());
+    for (ContainerId id : stopResponse.getSuccessfullyStoppedContainers()) {
+      // Containers with odd id should succeed.
+      Assert.assertEquals(1, id.getId() & 1);
+    }
+    Assert.assertEquals(5, stopResponse.getFailedRequests().size());
+    for (Map.Entry<ContainerId, SerializedException> entry : stopResponse
+      .getFailedRequests().entrySet()) {
+      // Containers with even id should fail.
+      Assert.assertEquals(0, entry.getKey().getId() & 1);
+      Assert.assertTrue(entry.getValue().getMessage()
+        .contains("Reject this container"));
+    }
+  }
+
   public static Token createContainerToken(ContainerId cId, long rmIdentifier,
       NodeId nodeId, String user,
       NMContainerTokenSecretManager containerTokenSecretManager)

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java Mon Aug 12 21:25:49 2013
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -265,9 +266,12 @@ public class TestApplication {
               AuxServicesEventType.APPLICATION_STOP, wa.appId)));
 
       wa.appResourcesCleanedup();
-      for ( Container container : wa.containers) {
+      for (Container container : wa.containers) {
+        ContainerTokenIdentifier identifier =
+            wa.getContainerTokenIdentifier(container.getContainerId());
+        waitForContainerTokenToExpire(identifier);
         Assert.assertTrue(wa.context.getContainerTokenSecretManager()
-          .isValidStartContainerRequest(container.getContainerId()));
+          .isValidStartContainerRequest(identifier));
       }
       assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
 
@@ -277,6 +281,18 @@ public class TestApplication {
     }
   }
 
+  protected ContainerTokenIdentifier waitForContainerTokenToExpire(
+      ContainerTokenIdentifier identifier) {
+    int attempts = 5;
+    while (System.currentTimeMillis() < identifier.getExpiryTimeStamp()
+        && attempts-- > 0) {
+      try {
+        Thread.sleep(1000);
+      } catch (Exception e) {}
+    }
+    return identifier;
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testAppFinishedOnCompletedContainers() {
@@ -306,8 +322,11 @@ public class TestApplication {
 
       wa.appResourcesCleanedup();
       for ( Container container : wa.containers) {
+        ContainerTokenIdentifier identifier =
+            wa.getContainerTokenIdentifier(container.getContainerId());
+        waitForContainerTokenToExpire(identifier);
         Assert.assertTrue(wa.context.getContainerTokenSecretManager()
-          .isValidStartContainerRequest(container.getContainerId()));
+          .isValidStartContainerRequest(identifier));
       }
       assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
     } finally {
@@ -440,7 +459,8 @@ public class TestApplication {
     final String user;
     final List<Container> containers;
     final Context context;
-
+    final Map<ContainerId, ContainerTokenIdentifier> containerTokenIdentifierMap;
+    
     final ApplicationId appId;
     final Application app;
 
@@ -448,6 +468,8 @@ public class TestApplication {
       Configuration conf = new Configuration();
       
       dispatcher = new DrainDispatcher();
+      containerTokenIdentifierMap =
+          new HashMap<ContainerId, ContainerTokenIdentifier>();
       dispatcher.init(conf);
 
       localizerBus = mock(EventHandler.class);
@@ -486,11 +508,15 @@ public class TestApplication {
         Container container = createMockedContainer(this.appId, i);
         containers.add(container);
         long currentTime = System.currentTimeMillis();
+        ContainerTokenIdentifier identifier =
+            new ContainerTokenIdentifier(container.getContainerId(), "", "",
+              null, currentTime + 2000, masterKey.getKeyId(), currentTime);
+        containerTokenIdentifierMap
+          .put(identifier.getContainerID(), identifier);
         context.getContainerTokenSecretManager().startContainerSuccessful(
-          new ContainerTokenIdentifier(container.getContainerId(), "",
-            "", null, currentTime + 1000, masterKey.getKeyId(), currentTime));
+          identifier);
         Assert.assertFalse(context.getContainerTokenSecretManager()
-          .isValidStartContainerRequest(container.getContainerId()));
+          .isValidStartContainerRequest(identifier));
       }
 
       dispatcher.start();
@@ -542,6 +568,11 @@ public class TestApplication {
           ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
       drainDispatcherEvents();
     }
+    
+    public ContainerTokenIdentifier getContainerTokenIdentifier(
+        ContainerId containerId) {
+      return this.containerTokenIdentifierMap.get(containerId);
+    }
   }
 
   private Container createMockedContainer(ApplicationId appId, int containerId) {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Mon Aug 12 21:25:49 2013
@@ -18,7 +18,10 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -40,11 +43,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -58,10 +66,13 @@ import org.apache.hadoop.yarn.event.Drai
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
@@ -124,6 +135,7 @@ public class TestContainer {
 
       // all resources should be localized
       assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      assertNotNull(wc.c.getLocalizedResources());
       for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
           .entrySet()) {
         assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
@@ -161,6 +173,7 @@ public class TestContainer {
       wc.containerKilledOnRequest();
       assertEquals(ContainerState.EXITED_WITH_FAILURE, 
           wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -183,6 +196,7 @@ public class TestContainer {
       wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
       assertEquals(ContainerState.EXITED_WITH_FAILURE, 
           wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -205,7 +219,7 @@ public class TestContainer {
       wc.containerSuccessful();
       assertEquals(ContainerState.EXITED_WITH_SUCCESS,
           wc.c.getContainerState());
-      
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -228,10 +242,12 @@ public class TestContainer {
       wc.containerSuccessful();
       wc.containerResourcesCleanup();
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       // Now in DONE, issue INIT
       wc.initContainer();
       // Verify still in DONE
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -240,6 +256,34 @@ public class TestContainer {
       }
     }
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  // mocked generic
+  public void testLocalizationFailureAtDone() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(6, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      wc.launchContainer();
+      reset(wc.localizerBus);
+      wc.containerSuccessful();
+      wc.containerResourcesCleanup();
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      // Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
+      wc.resourceFailedContainer();
+      // Verify still in DONE
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
   
   @Test
   @SuppressWarnings("unchecked") // mocked generic
@@ -252,7 +296,9 @@ public class TestContainer {
       wc.launchContainer();
       reset(wc.localizerBus);
       wc.killContainer();
-      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.containerKilledOnRequest();
       
       verifyCleanupCall(wc);
@@ -271,8 +317,30 @@ public class TestContainer {
       wc.initContainer();
       wc.failLocalizeResources(wc.getLocalResourceCount());
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.killContainer();
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+
+  @Test
+  public void testKillOnLocalized() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      wc.killContainer();
+      assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     } finally {
       if (wc != null) {
@@ -293,8 +361,10 @@ public class TestContainer {
       }
       wc.failLocalizeResources(failCount);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.localizeResourcesFromInvalidState(failCount);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
       Assert.assertTrue(wc.getDiagnostics().contains(FAKE_LOCALIZATION_ERROR));
     } finally {
@@ -316,8 +386,10 @@ public class TestContainer {
       String key2 = lRsrcKeys.next();
       wc.failLocalizeSpecificResource(key1);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.failLocalizeSpecificResource(key2);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     } finally {
       if (wc != null) {
@@ -337,8 +409,10 @@ public class TestContainer {
       String key1 = lRsrcKeys.next();
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.failLocalizeSpecificResource(key1);
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     } finally {
       if (wc != null) {
@@ -398,9 +472,13 @@ public class TestContainer {
       wc.initContainer();
       wc.localizeResources();
       wc.killContainer();
-      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.launchContainer();
-      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.containerKilledOnRequest();
       verifyCleanupCall(wc);
     } finally {
@@ -537,6 +615,7 @@ public class TestContainer {
     final EventHandler<AuxServicesEvent> auxBus;
     final EventHandler<ApplicationEvent> appBus;
     final EventHandler<LogHandlerEvent> LogBus;
+    final ContainersLauncher launcher;
 
     final ContainerLaunchContext ctxt;
     final ContainerId cId;
@@ -549,6 +628,7 @@ public class TestContainer {
       this(appId, timestamp, id, user, true, false);
     }
 
+    @SuppressWarnings("rawtypes")
     WrappedContainer(int appId, long timestamp, int id, String user,
         boolean withLocalRes, boolean withServiceData) throws IOException {
       dispatcher = new DrainDispatcher();
@@ -567,6 +647,22 @@ public class TestContainer {
       dispatcher.register(ApplicationEventType.class, appBus);
       dispatcher.register(LogHandlerEventType.class, LogBus);
 
+      Context context = mock(Context.class);
+      when(context.getApplications()).thenReturn(
+          new ConcurrentHashMap<ApplicationId, Application>());
+      launcher = new ContainersLauncher(context, dispatcher, null, null);
+      // create a mock ExecutorService, which will not really launch
+      // ContainerLaunch at all.
+      launcher.containerLauncher = mock(ExecutorService.class);
+      Future future = mock(Future.class);
+      when(launcher.containerLauncher.submit
+          (any(Callable.class))).thenReturn(future);
+      when(future.isDone()).thenReturn(false);
+      when(future.cancel(false)).thenReturn(true);
+      launcher.init(new Configuration());
+      launcher.start();
+      dispatcher.register(ContainersLauncherEventType.class, launcher);
+
       ctxt = mock(ContainerLaunchContext.class);
       org.apache.hadoop.yarn.api.records.Container mockContainer =
           mock(org.apache.hadoop.yarn.api.records.Container.class);
@@ -608,6 +704,13 @@ public class TestContainer {
       when(ctxt.getServiceData()).thenReturn(serviceData);
 
       c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
+      dispatcher.register(ContainerEventType.class,
+          new EventHandler<ContainerEvent>() {
+            @Override
+            public void handle(ContainerEvent event) {
+                c.handle(event);
+            }
+      });
       dispatcher.start();
     }
 
@@ -624,6 +727,11 @@ public class TestContainer {
       drainDispatcherEvents();
     }
 
+    public void resourceFailedContainer() {
+      c.handle(new ContainerEvent(cId, ContainerEventType.RESOURCE_FAILED));
+      drainDispatcherEvents();
+    }
+
     // Localize resources 
     // Skip some resources so as to consider them failed
     public Map<Path, List<String>> doLocalizeResources(