You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC

svn commit: r1513258 [7/10] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-...

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java Mon Aug 12 21:25:49 2013
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -28,22 +30,26 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -57,10 +63,16 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -98,7 +110,7 @@ public class TestContainerLaunch extends
       tempFile = Shell.appendScriptExtension(tmpDir, "temp");
       String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
         "echo \"hello\"";
-      PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));    
+      PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
       FileUtil.setExecutable(shellFile, true);
       writer.println(timeoutCommand);
       writer.close();
@@ -132,7 +144,7 @@ public class TestContainerLaunch extends
       assertEquals(shexc.getExitCode(), 0);
       assert(shexc.getOutput().contains("hello"));
 
-      symLinkFile = new File(tmpDir, badSymlink);      
+      symLinkFile = new File(tmpDir, badSymlink);
     }
     finally {
       // cleanup
@@ -151,6 +163,173 @@ public class TestContainerLaunch extends
     }
   }
 
+  // test the diagnostics are generated
+  @Test (timeout = 20000)
+  public void testInvalidSymlinkDiagnostics() throws IOException  {
+
+    File shellFile = null;
+    File tempFile = null;
+    String symLink = Shell.WINDOWS ? "test.cmd" :
+      "test";
+    File symLinkFile = null;
+
+    try {
+      shellFile = Shell.appendScriptExtension(tmpDir, "hello");
+      tempFile = Shell.appendScriptExtension(tmpDir, "temp");
+      String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
+        "echo \"hello\"";
+      PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+      FileUtil.setExecutable(shellFile, true);
+      writer.println(timeoutCommand);
+      writer.close();
+
+      Map<Path, List<String>> resources =
+          new HashMap<Path, List<String>>();
+      //This is an invalid path and should throw exception because of No such file.
+      Path invalidPath = new Path(shellFile.getAbsolutePath()+"randomPath");
+      resources.put(invalidPath, Arrays.asList(symLink));
+      FileOutputStream fos = new FileOutputStream(tempFile);
+
+      Map<String, String> env = new HashMap<String, String>();
+      List<String> commands = new ArrayList<String>();
+      if (Shell.WINDOWS) {
+        commands.add("cmd");
+        commands.add("/c");
+        commands.add("\"" + symLink + "\"");
+      } else {
+        commands.add("/bin/sh ./\\\"" + symLink + "\\\"");
+      }
+      ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
+      fos.flush();
+      fos.close();
+      FileUtil.setExecutable(tempFile, true);
+
+      Shell.ShellCommandExecutor shexc
+      = new Shell.ShellCommandExecutor(new String[]{tempFile.getAbsolutePath()}, tmpDir);
+      String diagnostics = null;
+      try {
+        shexc.execute();
+        Assert.fail("Should catch exception");
+      } catch(ExitCodeException e){
+        diagnostics = e.getMessage();
+      }
+      Assert.assertNotNull(diagnostics);
+      Assert.assertTrue(shexc.getExitCode() != 0);
+      symLinkFile = new File(tmpDir, symLink);
+    }
+    finally {
+      // cleanup
+      if (shellFile != null
+          && shellFile.exists()) {
+        shellFile.delete();
+      }
+      if (tempFile != null
+          && tempFile.exists()) {
+        tempFile.delete();
+      }
+      if (symLinkFile != null
+          && symLinkFile.exists()) {
+        symLinkFile.delete();
+      }
+    }
+  }
+
+  @Test (timeout = 20000)
+  public void testInvalidEnvSyntaxDiagnostics() throws IOException  {
+
+    File shellFile = null;
+    try {
+      shellFile = Shell.appendScriptExtension(tmpDir, "hello");
+      String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
+        "echo \"hello\"";
+      PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+      FileUtil.setExecutable(shellFile, true);
+      writer.println(timeoutCommand);
+      writer.close();
+      Map<Path, List<String>> resources =
+          new HashMap<Path, List<String>>();
+      FileOutputStream fos = new FileOutputStream(shellFile);
+
+      Map<String, String> env = new HashMap<String, String>();
+      // invalid env
+      env.put(
+          "APPLICATION_WORKFLOW_CONTEXT", "{\"workflowId\":\"609f91c5cd83\"," +
+          "\"workflowName\":\"\n\ninsert table " +
+          "\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " );
+      List<String> commands = new ArrayList<String>();
+      ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
+      fos.flush();
+      fos.close();
+
+      Shell.ShellCommandExecutor shexc
+      = new Shell.ShellCommandExecutor(new String[]{shellFile.getAbsolutePath()}, tmpDir);
+      String diagnostics = null;
+      try {
+        shexc.execute();
+        Assert.fail("Should catch exception");
+      } catch(ExitCodeException e){
+        diagnostics = e.getMessage();
+      }
+      Assert.assertTrue(diagnostics.contains("command not found"));
+      Assert.assertTrue(shexc.getExitCode() != 0);
+    }
+    finally {
+      // cleanup
+      if (shellFile != null
+          && shellFile.exists()) {
+        shellFile.delete();
+      }
+    }
+  }
+
+  @Test (timeout = 20000)
+  public void testContainerLaunchStdoutAndStderrDiagnostics() throws IOException {
+
+    File shellFile = null;
+    try {
+      shellFile = Shell.appendScriptExtension(tmpDir, "hello");
+      // echo "hello" to stdout and "error" to stderr and exit code with 2;
+      String command = Shell.WINDOWS ? "@echo \"hello\"; @echo \"error\" 1>&2; exit 2;" :
+        "echo \"hello\"; echo \"error\" 1>&2; exit 2;";
+      PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+      FileUtil.setExecutable(shellFile, true);
+      writer.println(command);
+      writer.close();
+      Map<Path, List<String>> resources =
+          new HashMap<Path, List<String>>();
+      FileOutputStream fos = new FileOutputStream(shellFile);
+
+      Map<String, String> env = new HashMap<String, String>();
+      List<String> commands = new ArrayList<String>();
+      commands.add(command);
+      ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
+      fos.flush();
+      fos.close();
+
+      Shell.ShellCommandExecutor shexc
+      = new Shell.ShellCommandExecutor(new String[]{shellFile.getAbsolutePath()}, tmpDir);
+      String diagnostics = null;
+      try {
+        shexc.execute();
+        Assert.fail("Should catch exception");
+      } catch(ExitCodeException e){
+        diagnostics = e.getMessage();
+      }
+      // test stderr
+      Assert.assertTrue(diagnostics.contains("error"));
+      // test stdout
+      Assert.assertTrue(shexc.getOutput().contains("hello"));
+      Assert.assertTrue(shexc.getExitCode() == 2);
+    }
+    finally {
+      // cleanup
+      if (shellFile != null
+          && shellFile.exists()) {
+        shellFile.delete();
+      }
+    }
+  }
+
   /**
    * See if environment variable is forwarded using sanitizeEnv.
    * @throws Exception
@@ -231,10 +410,14 @@ public class TestContainerLaunch extends
     // set up the rest of the container
     List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
     containerLaunchContext.setCommands(commands);
-    StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
-    startRequest.setContainerToken(createContainerToken(cId));
-    containerManager.startContainer(startRequest);
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          createContainerToken(cId));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
 
     int timeoutSecs = 0;
     while (!processStartFile.exists() && timeoutSecs++ < 20) {
@@ -297,18 +480,20 @@ public class TestContainerLaunch extends
     Assert.assertTrue("Process is not alive!",
       DefaultContainerExecutor.containerIsAlive(pid));
 
-    StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
-    stopRequest.setContainerId(cId);
-    containerManager.stopContainer(stopRequest);
+    // Now test the stop functionality.
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cId);
+    StopContainersRequest stopRequest =
+        StopContainersRequest.newInstance(containerIds);
+    containerManager.stopContainers(stopRequest);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
 
-    GetContainerStatusRequest gcsRequest = 
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-    gcsRequest.setContainerId(cId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus = 
-        containerManager.getContainerStatus(gcsRequest).getStatus();
+        containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
     int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
       ExitCode.TERMINATED.getExitCode();
     Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
@@ -376,11 +561,15 @@ public class TestContainerLaunch extends
     List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
     containerLaunchContext.setCommands(commands);
     Token containerToken = createContainerToken(cId);
-    StartContainerRequest startRequest =
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
-    startRequest.setContainerToken(containerToken);
-    containerManager.startContainer(startRequest);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          containerToken);
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
 
     int timeoutSecs = 0;
     while (!processStartFile.exists() && timeoutSecs++ < 20) {
@@ -391,21 +580,25 @@ public class TestContainerLaunch extends
         processStartFile.exists());
 
     // Now test the stop functionality.
-    StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
-    stopRequest.setContainerId(cId);
-    containerManager.stopContainer(stopRequest);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cId);
+    StopContainersRequest stopRequest =
+        StopContainersRequest.newInstance(containerIds);
+    containerManager.stopContainers(stopRequest);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
 
     // container stop sends a sigterm followed by a sigkill
-    GetContainerStatusRequest gcsRequest = 
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-    gcsRequest.setContainerId(cId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
+    
     ContainerStatus containerStatus = 
-        containerManager.getContainerStatus(gcsRequest).getStatus();
-    Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
-        containerStatus.getExitStatus());
+        containerManager.getContainerStatuses(gcsRequest)
+          .getContainerStatuses().get(0);
+    int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
+      ExitCode.TERMINATED.getExitCode();
+    Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
 
     // Now verify the contents of the file.  Script generates a message when it
     // receives a sigterm so we look for that.  We cannot perform this check on
@@ -435,6 +628,32 @@ public class TestContainerLaunch extends
     }
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testCallFailureWithNullLocalizedResources() {
+    Container container = mock(Container.class);
+    when(container.getContainerId()).thenReturn(ContainerId.newInstance(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(
+            System.currentTimeMillis(), 1), 1), 1));
+    ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
+    when(clc.getCommands()).thenReturn(Collections.<String>emptyList());
+    when(container.getLaunchContext()).thenReturn(clc);
+    when(container.getLocalizedResources()).thenReturn(null);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler eventHandler = new EventHandler() {
+      public void handle(Event event) {
+        Assert.assertTrue(event instanceof ContainerExitEvent);
+        ContainerExitEvent exitEvent = (ContainerExitEvent) event;
+        Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+            exitEvent.getType());
+      }
+    };
+    when(dispatcher.getEventHandler()).thenReturn(eventHandler);
+    ContainerLaunch launch = new ContainerLaunch(context, new Configuration(),
+        dispatcher, exec, null, container, dirsHandler);
+    launch.call();
+  }
+
   protected Token createContainerToken(ContainerId cId) throws InvalidToken {
     Resource r = BuilderUtils.newResource(1024, 1);
     ContainerTokenIdentifier containerTokenIdentifier =

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java Mon Aug 12 21:25:49 2013
@@ -18,8 +18,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Mon Aug 12 21:25:49 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -118,8 +119,9 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -132,31 +134,44 @@ public class TestResourceLocalizationSer
   static final Path basedir =
       new Path("target", TestResourceLocalizationService.class.getName());
   static Server mockServer;
+
+  private Configuration conf;
+  private AbstractFileSystem spylfs;
+  private FileContext lfs;
   
   @BeforeClass
-  public static void setup() {
+  public static void setupClass() {
     mockServer = mock(Server.class);
     doReturn(new InetSocketAddress(123)).when(mockServer).getListenerAddress();
   }
+
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+    spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    lfs = FileContext.getFileContext(spylfs, conf);
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+    String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+  }
+
+  @After
+  public void cleanup() {
+    conf = null;
+  }
   
   @Test
   public void testLocalizationInit() throws Exception {
-    final Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(new Configuration());
 
     ContainerExecutor exec = mock(ContainerExecutor.class);
     DeletionService delService = spy(new DeletionService(exec));
-    delService.init(new Configuration());
+    delService.init(conf);
     delService.start();
 
-    AbstractFileSystem spylfs =
-      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-    FileContext lfs = FileContext.getFileContext(spylfs, conf);
-    doNothing().when(spylfs).mkdir(
-        isA(Path.class), isA(FsPermission.class), anyBoolean());
-
     List<Path> localDirs = new ArrayList<Path>();
     String[] sDirs = new String[4];
     for (int i = 0; i < 4; ++i) {
@@ -164,6 +179,7 @@ public class TestResourceLocalizationSer
       sDirs[i] = localDirs.get(i).toString();
     }
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
     LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
     diskhandler.init(conf);
 
@@ -204,13 +220,6 @@ public class TestResourceLocalizationSer
   @Test
   @SuppressWarnings("unchecked") // mocked generics
   public void testResourceRelease() throws Exception {
-    Configuration conf = new YarnConfiguration();
-    AbstractFileSystem spylfs =
-      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-    final FileContext lfs = FileContext.getFileContext(spylfs, conf);
-    doNothing().when(spylfs).mkdir(
-        isA(Path.class), isA(FsPermission.class), anyBoolean());
-
     List<Path> localDirs = new ArrayList<Path>();
     String[] sDirs = new String[4];
     for (int i = 0; i < 4; ++i) {
@@ -218,8 +227,7 @@ public class TestResourceLocalizationSer
       sDirs[i] = localDirs.get(i).toString();
     }
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
-    String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
-    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
     LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
     DrainDispatcher dispatcher = new DrainDispatcher();
     dispatcher.init(conf);
@@ -394,13 +402,6 @@ public class TestResourceLocalizationSer
   @Test( timeout = 10000)
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
-    Configuration conf = new YarnConfiguration();
-    AbstractFileSystem spylfs =
-      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-    final FileContext lfs = FileContext.getFileContext(spylfs, conf);
-    doNothing().when(spylfs).mkdir(
-        isA(Path.class), isA(FsPermission.class), anyBoolean());
-
     List<Path> localDirs = new ArrayList<Path>();
     String[] sDirs = new String[1];
     // Making sure that we have only one local disk so that it will only be
@@ -413,8 +414,6 @@ public class TestResourceLocalizationSer
     // Adding configuration to make sure there is only one file per
     // directory
     conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
-    String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
-    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     DrainDispatcher dispatcher = new DrainDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -589,12 +588,6 @@ public class TestResourceLocalizationSer
   @Test(timeout=20000)
   @SuppressWarnings("unchecked") // mocked generics
   public void testFailedPublicResource() throws Exception {
-    Configuration conf = new YarnConfiguration();
-    AbstractFileSystem spylfs =
-      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-    final FileContext lfs = FileContext.getFileContext(spylfs, conf);
-    doNothing().when(spylfs).mkdir(
-        isA(Path.class), isA(FsPermission.class), anyBoolean());
     List<Path> localDirs = new ArrayList<Path>();
     String[] sDirs = new String[4];
     for (int i = 0; i < 4; ++i) {
@@ -602,8 +595,6 @@ public class TestResourceLocalizationSer
       sDirs[i] = localDirs.get(i).toString();
     }
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
-    String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
-    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
 
     DrainDispatcher dispatcher = new DrainDispatcher();
     EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
@@ -701,15 +692,6 @@ public class TestResourceLocalizationSer
       String user = "testuser";
       ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
 
-      // mocked Resource Localization Service
-      Configuration conf = new Configuration();
-      AbstractFileSystem spylfs =
-          spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-      final FileContext lfs = FileContext.getFileContext(spylfs, conf);
-      // We don't want files to be created
-      doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
-        anyBoolean());
-
       // creating one local directory
       List<Path> localDirs = new ArrayList<Path>();
       String[] sDirs = new String[1];
@@ -718,10 +700,6 @@ public class TestResourceLocalizationSer
         sDirs[i] = localDirs.get(i).toString();
       }
       conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
-      // setting log directory.
-      String logDir =
-          lfs.makeQualified(new Path(basedir, "logdir ")).toString();
-      conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
 
       LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
       localDirHandler.init(conf);
@@ -865,15 +843,6 @@ public class TestResourceLocalizationSer
       String user = "testuser";
       ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
 
-      // mocked Resource Localization Service
-      Configuration conf = new Configuration();
-      AbstractFileSystem spylfs =
-          spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-      final FileContext lfs = FileContext.getFileContext(spylfs, conf);
-      // We don't want files to be created
-      doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
-        anyBoolean());
-
       // creating one local directory
       List<Path> localDirs = new ArrayList<Path>();
       String[] sDirs = new String[1];
@@ -882,10 +851,6 @@ public class TestResourceLocalizationSer
         sDirs[i] = localDirs.get(i).toString();
       }
       conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
-      // setting log directory.
-      String logDir =
-          lfs.makeQualified(new Path(basedir, "logdir ")).toString();
-      conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
 
       LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
       localDirHandler.init(conf);
@@ -1013,8 +978,8 @@ public class TestResourceLocalizationSer
       String localizerId, LocalResourceRequest req) {
     LocalizerStatus status = createLocalizerStatus(localizerId);
     LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl();
-    resourceStatus.setException(YarnServerBuilderUtils
-        .newSerializedException(new YarnException("test")));
+    resourceStatus.setException(SerializedException
+      .newInstance(new YarnException("test")));
     resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE);
     resourceStatus.setResource(req);
     status.addResourceStatus(resourceStatus);
@@ -1043,16 +1008,6 @@ public class TestResourceLocalizationSer
     DrainDispatcher dispatcher1 = null;
     String user = "testuser";
     try {
-      // Setting up ResourceLocalization service.
-      Configuration conf = new Configuration();
-      dispatcher1 = new DrainDispatcher();
-      AbstractFileSystem spylfs =
-          spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-      final FileContext lfs = FileContext.getFileContext(spylfs, conf);
-      // We don't want files to be created
-      doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
-        anyBoolean());
-
       // creating one local directory
       List<Path> localDirs = new ArrayList<Path>();
       String[] sDirs = new String[1];
@@ -1061,13 +1016,10 @@ public class TestResourceLocalizationSer
         sDirs[i] = localDirs.get(i).toString();
       }
       conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
-      // setting log directory.
-      String logDir =
-          lfs.makeQualified(new Path(basedir, "logdir ")).toString();
-      conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
 
       // Registering event handlers
       EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+      dispatcher1 = new DrainDispatcher();
       dispatcher1.register(ApplicationEventType.class, applicationBus);
       EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
       dispatcher1.register(ContainerEventType.class, containerBus);

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Mon Aug 12 21:25:49 2013
@@ -63,6 +63,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -809,14 +810,18 @@ public class TestLogAggregationService e
     commands.add("/bin/bash");
     commands.add(scriptFile.getAbsolutePath());
     containerLaunchContext.setCommands(commands);
-    StartContainerRequest startRequest =
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
-    startRequest.setContainerToken(TestContainerManager.createContainerToken(
-      cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
-      context.getContainerTokenSecretManager()));
-    this.containerManager.startContainer(startRequest);
 
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          TestContainerManager.createContainerToken(
+            cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+            context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    this.containerManager.startContainers(allRequests);
+    
     BaseContainerManagerTest.waitForContainerState(this.containerManager,
         cId, ContainerState.COMPLETE);
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Mon Aug 12 21:25:49 2013
@@ -40,8 +40,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -227,9 +228,6 @@ public class TestContainersMonitor exten
     commands.add(scriptFile.getAbsolutePath());
     containerLaunchContext.setCommands(commands);
     Resource r = BuilderUtils.newResource(8 * 1024 * 1024, 1);
-    StartContainerRequest startRequest =
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
     ContainerTokenIdentifier containerIdentifier =
         new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
           r, System.currentTimeMillis() + 120000, 123, DUMMY_RM_IDENTIFIER);
@@ -237,8 +235,14 @@ public class TestContainersMonitor exten
         BuilderUtils.newContainerToken(context.getNodeId(),
           containerManager.getContext().getContainerTokenSecretManager()
             .createPassword(containerIdentifier), containerIdentifier);
-    startRequest.setContainerToken(containerToken);
-    containerManager.startContainer(startRequest);
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          containerToken);
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
     
     int timeoutSecs = 0;
     while (!processStartFile.exists() && timeoutSecs++ < 20) {
@@ -260,11 +264,12 @@ public class TestContainersMonitor exten
     BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE, 60);
 
-    GetContainerStatusRequest gcsRequest =
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-    gcsRequest.setContainerId(cId);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus =
-        containerManager.getContainerStatus(gcsRequest).getStatus();
+        containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
     Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
         containerStatus.getExitStatus());
     String expectedMsgPattern =

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
   <parent>
     <artifactId>hadoop-yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.3.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-  <version>2.2.0-SNAPSHOT</version>
+  <version>2.3.0-SNAPSHOT</version>
   <name>hadoop-yarn-server-resourcemanager</name>
 
   <properties>

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Mon Aug 12 21:25:49 2013
@@ -33,8 +33,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -58,11 +60,16 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -72,8 +79,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceBlacklistRequestException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -102,7 +107,6 @@ public class ApplicationMasterService ex
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
     this.resync.setAMCommand(AMCommand.AM_RESYNC);
-//    this.reboot.containers = new ArrayList<Container>();
     this.rmContext = rmContext;
   }
 
@@ -116,10 +120,16 @@ public class ApplicationMasterService ex
         YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
 
+    Configuration serverConf = conf;
+    // If the auth is not-simple, enforce it to be token-based.
+    serverConf = new Configuration(conf);
+    serverConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        SaslRpcServer.AuthMethod.TOKEN.toString());
     this.server =
       rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
-          conf, this.rmContext.getAMRMTokenSecretManager(),
-          conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 
+          serverConf, this.rmContext.getAMRMTokenSecretManager(),
+          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 
               YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
     
     // Enable service authorization?
@@ -141,33 +151,61 @@ public class ApplicationMasterService ex
     return this.bindAddress;
   }
 
-  private void authorizeRequest(ApplicationAttemptId appAttemptID)
-      throws YarnException {
-
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return;
+  // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
+  // currently sets only the required id, but iterate through anyways just to be
+  // sure.
+  private AMRMTokenIdentifier selectAMRMTokenIdentifier(
+      UserGroupInformation remoteUgi) throws IOException {
+    AMRMTokenIdentifier result = null;
+    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+    for (TokenIdentifier tokenId : tokenIds) {
+      if (tokenId instanceof AMRMTokenIdentifier) {
+        result = (AMRMTokenIdentifier) tokenId;
+        break;
+      }
     }
 
-    String appAttemptIDStr = appAttemptID.toString();
+    return result;
+  }
+
+  private ApplicationAttemptId authorizeRequest()
+      throws YarnException {
 
     UserGroupInformation remoteUgi;
     try {
       remoteUgi = UserGroupInformation.getCurrentUser();
     } catch (IOException e) {
-      String msg = "Cannot obtain the user-name for ApplicationAttemptID: "
-          + appAttemptIDStr + ". Got exception: "
-          + StringUtils.stringifyException(e);
+      String msg =
+          "Cannot obtain the user-name for authorizing ApplicationMaster. "
+              + "Got exception: " + StringUtils.stringifyException(e);
       LOG.warn(msg);
       throw RPCUtil.getRemoteException(msg);
     }
 
-    if (!remoteUgi.getUserName().equals(appAttemptIDStr)) {
-      String msg = "Unauthorized request from ApplicationMaster. "
-          + "Expected ApplicationAttemptID: " + remoteUgi.getUserName()
-          + " Found: " + appAttemptIDStr;
-      LOG.warn(msg);
-      throw RPCUtil.getRemoteException(msg);
+    boolean tokenFound = false;
+    String message = "";
+    AMRMTokenIdentifier appTokenIdentifier = null;
+    try {
+      appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
+      if (appTokenIdentifier == null) {
+        tokenFound = false;
+        message = "No AMRMToken found for user " + remoteUgi.getUserName();
+      } else {
+        tokenFound = true;
+      }
+    } catch (IOException e) {
+      tokenFound = false;
+      message =
+          "Got exception while looking for AMRMToken for user "
+              + remoteUgi.getUserName();
     }
+
+    if (!tokenFound) {
+      LOG.warn(message);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    return appTokenIdentifier.getApplicationAttemptId();
   }
 
   @Override
@@ -175,9 +213,7 @@ public class ApplicationMasterService ex
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = request
-        .getApplicationAttemptId();
-    authorizeRequest(applicationAttemptId);
+    ApplicationAttemptId applicationAttemptId = authorizeRequest();
 
     ApplicationId appID = applicationAttemptId.getApplicationId();
     AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
@@ -247,9 +283,7 @@ public class ApplicationMasterService ex
       FinishApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = request
-        .getApplicationAttemptId();
-    authorizeRequest(applicationAttemptId);
+    ApplicationAttemptId applicationAttemptId = authorizeRequest();
 
     AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
     if (lastResponse == null) {
@@ -297,8 +331,7 @@ public class ApplicationMasterService ex
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
 
-    ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
-    authorizeRequest(appAttemptId);
+    ApplicationAttemptId appAttemptId = authorizeRequest();
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
@@ -354,7 +387,7 @@ public class ApplicationMasterService ex
       
       // sanity check
       try {
-        SchedulerUtils.validateResourceRequests(ask,
+        RMServerUtils.validateResourceRequests(ask,
             rScheduler.getMaximumResourceCapability());
       } catch (InvalidResourceRequestException e) {
         LOG.warn("Invalid resource ask by application " + appAttemptId, e);
@@ -362,12 +395,19 @@ public class ApplicationMasterService ex
       }
       
       try {
-        SchedulerUtils.validateBlacklistRequest(blacklistRequest);
+        RMServerUtils.validateBlacklistRequest(blacklistRequest);
       }  catch (InvalidResourceBlacklistRequestException e) {
         LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
         throw e;
       }
       
+      try {
+        RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+      } catch (InvalidContainerReleaseException e) {
+        LOG.warn("Invalid container release by application " + appAttemptId, e);
+        throw e;
+      }
+      
       // Send new requests to appAttempt.
       Allocation allocation =
           this.rScheduler.allocate(appAttemptId, ask, release, 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Mon Aug 12 21:25:49 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -218,7 +219,7 @@ public class ClientRMService extends Abs
   
   /**
    * It gives response which includes application report if the application
-   * present otherwise gives response with application report as null.
+   * present otherwise throws ApplicationNotFoundException.
    */
   @Override
   public GetApplicationReportResponse getApplicationReport(
@@ -235,10 +236,10 @@ public class ClientRMService extends Abs
 
     RMApp application = this.rmContext.getRMApps().get(applicationId);
     if (application == null) {
-      // If the RM doesn't have the application, provide the response with
-      // application report as null and let the clients to handle.
-      return recordFactory
-          .newRecordInstance(GetApplicationReportResponse.class);
+      // If the RM doesn't have the application, throw
+      // ApplicationNotFoundException and let client to handle.
+      throw new ApplicationNotFoundException("Application with id '"
+          + applicationId + "' doesn't exist in RM.");
     }
 
     boolean allowAccess = checkAccess(callerUGI, application.getUser(),

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Mon Aug 12 21:25:49 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -48,7 +49,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Mon Aug 12 21:25:49 2013
@@ -57,7 +57,7 @@ public class RMContextImpl implements RM
   private RMStateStore stateStore = null;
   private ContainerAllocationExpirer containerAllocationExpirer;
   private final DelegationTokenRenewer tokenRenewer;
-  private final AMRMTokenSecretManager appTokenSecretManager;
+  private final AMRMTokenSecretManager amRMTokenSecretManager;
   private final RMContainerTokenSecretManager containerTokenSecretManager;
   private final NMTokenSecretManagerInRM nmTokenSecretManager;
   private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
@@ -68,7 +68,7 @@ public class RMContextImpl implements RM
       AMLivelinessMonitor amLivelinessMonitor,
       AMLivelinessMonitor amFinishingMonitor,
       DelegationTokenRenewer tokenRenewer,
-      AMRMTokenSecretManager appTokenSecretManager,
+      AMRMTokenSecretManager amRMTokenSecretManager,
       RMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInRM nmTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
@@ -78,7 +78,7 @@ public class RMContextImpl implements RM
     this.amLivelinessMonitor = amLivelinessMonitor;
     this.amFinishingMonitor = amFinishingMonitor;
     this.tokenRenewer = tokenRenewer;
-    this.appTokenSecretManager = appTokenSecretManager;
+    this.amRMTokenSecretManager = amRMTokenSecretManager;
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.nmTokenSecretManager = nmTokenSecretManager;
     this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
@@ -156,7 +156,7 @@ public class RMContextImpl implements RM
 
   @Override
   public AMRMTokenSecretManager getAMRMTokenSecretManager() {
-    return this.appTokenSecretManager;
+    return this.amRMTokenSecretManager;
   }
 
   @Override

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Mon Aug 12 21:25:49 2013
@@ -22,8 +22,17 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 
 /**
  * Utility methods to aid serving RM data through the REST and RPC APIs
@@ -55,4 +64,52 @@ public class RMServerUtils {
     }
     return results;
   }
+  
+  /**
+   * Utility method to validate a list resource requests, by insuring that the
+   * requested memory/vcore is non-negative and not greater than max
+   */
+  public static void validateResourceRequests(List<ResourceRequest> ask,
+      Resource maximumResource) throws InvalidResourceRequestException {
+    for (ResourceRequest resReq : ask) {
+      SchedulerUtils.validateResourceRequest(resReq, maximumResource);
+    }
+  }
+
+  /*
+   * @throw <code>InvalidResourceBlacklistRequestException </code> if the
+   * resource is not able to be added to the blacklist.
+   */
+  public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRequest) 
+  throws InvalidResourceBlacklistRequestException {
+    if (blacklistRequest != null) {
+      List<String> plus = blacklistRequest.getBlacklistAdditions();
+      if (plus != null && plus.contains(ResourceRequest.ANY)) {
+        throw new InvalidResourceBlacklistRequestException(
+            "Cannot add " + ResourceRequest.ANY + " to the blacklist!");
+      }
+    }
+  }
+
+  /**
+   * It will validate to make sure all the containers belong to correct
+   * application attempt id. If not then it will throw
+   * {@link InvalidContainerReleaseException}
+   * @param containerReleaseList containers to be released as requested by
+   * application master.
+   * @param appAttemptId Application attempt Id
+   * @throws InvalidContainerReleaseException 
+   */
+  public static void
+      validateContainerReleaseRequest(List<ContainerId> containerReleaseList,
+          ApplicationAttemptId appAttemptId)
+          throws InvalidContainerReleaseException {
+    for (ContainerId cId : containerReleaseList) {
+      if (!appAttemptId.equals(cId.getApplicationAttemptId())) {
+        throw new InvalidContainerReleaseException("Cannot release container : "
+            + cId.toString() + " not belonging to this application attempt : "
+            + appAttemptId);
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Aug 12 21:25:49 2013
@@ -253,7 +253,7 @@ public class ResourceTrackerService exte
     RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
     if (rmNode == null) {
       /* node does not exist */
-      String message = "Node not found rebooting " + remoteNodeStatus.getNodeId();
+      String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
       LOG.info(message);
       resync.setDiagnosticsMessage(message);
       return resync;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Mon Aug 12 21:25:49 2013
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -33,11 +35,15 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -46,11 +52,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The launch of the AM itself.
  */
@@ -69,8 +74,6 @@ public class AMLauncher implements Runna
 
   private final RMAppAttempt application;
   private final Configuration conf;
-  private final RecordFactory recordFactory = 
-      RecordFactoryProvider.getRecordFactory(null);
   private final AMLauncherEventType eventType;
   private final RMContext rmContext;
   private final Container masterContainer;
@@ -103,22 +106,42 @@ public class AMLauncher implements Runna
         + " for AM " + application.getAppAttemptId());  
     ContainerLaunchContext launchContext =
         createAMContainerLaunchContext(applicationContext, masterContainerID);
-    StartContainerRequest request = 
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    request.setContainerLaunchContext(launchContext);
-    request.setContainerToken(masterContainer.getContainerToken());
-    containerMgrProxy.startContainer(request);
-    LOG.info("Done launching container " + masterContainer
-        + " for AM " + application.getAppAttemptId());
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(launchContext,
+          masterContainer.getContainerToken());
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+
+    StartContainersResponse response =
+        containerMgrProxy.startContainers(allRequests);
+    if (response.getFailedRequests() != null
+        && response.getFailedRequests().containsKey(masterContainerID)) {
+      Throwable t =
+          response.getFailedRequests().get(masterContainerID).deSerialize();
+      parseAndThrowException(t);
+    } else {
+      LOG.info("Done launching container " + masterContainer + " for AM "
+          + application.getAppAttemptId());
+    }
   }
   
   private void cleanup() throws IOException, YarnException {
     connect();
     ContainerId containerId = masterContainer.getId();
-    StopContainerRequest stopRequest = 
-        recordFactory.newRecordInstance(StopContainerRequest.class);
-    stopRequest.setContainerId(containerId);
-    containerMgrProxy.stopContainer(stopRequest);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(containerId);
+    StopContainersRequest stopRequest =
+        StopContainersRequest.newInstance(containerIds);
+    StopContainersResponse response =
+        containerMgrProxy.stopContainers(stopRequest);
+    if (response.getFailedRequests() != null
+        && response.getFailedRequests().containsKey(containerId)) {
+      Throwable t = response.getFailedRequests().get(containerId).deSerialize();
+      parseAndThrowException(t);
+    }
   }
 
   // Protected. For tests.
@@ -193,30 +216,28 @@ public class AMLauncher implements Runna
     environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
         String.valueOf(rmContext.getRMApps().get(
             applicationId).getMaxAppAttempts()));
- 
-    if (UserGroupInformation.isSecurityEnabled()) {
-      // TODO: Security enabled/disabled info should come from RM.
-
-      Credentials credentials = new Credentials();
-
-      DataInputByteBuffer dibb = new DataInputByteBuffer();
-      if (container.getTokens() != null) {
-        // TODO: Don't do this kind of checks everywhere.
-        dibb.reset(container.getTokens());
-        credentials.readTokenStorageStream(dibb);
-      }
 
-      // Add application token
-      Token<AMRMTokenIdentifier> amrmToken =
-          application.getAMRMToken();
-      if(amrmToken != null) {
-        credentials.addToken(amrmToken.getService(), amrmToken);
-      }
-      DataOutputBuffer dob = new DataOutputBuffer();
-      credentials.writeTokenStorageToStream(dob);
-      container.setTokens(ByteBuffer.wrap(dob.getData(), 0,
-        dob.getLength()));
+    Credentials credentials = new Credentials();
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    if (container.getTokens() != null) {
+      // TODO: Don't do this kind of checks everywhere.
+      dibb.reset(container.getTokens());
+      credentials.readTokenStorageStream(dibb);
     }
+
+    // Add AMRMToken
+    Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
+    if (amrmToken != null) {
+      credentials.addToken(amrmToken.getService(), amrmToken);
+    }
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+  }
+
+  @VisibleForTesting
+  protected Token<AMRMTokenIdentifier> getAMRMToken() {
+    return application.getAMRMToken();
   }
   
   @SuppressWarnings("unchecked")
@@ -257,4 +278,15 @@ public class AMLauncher implements Runna
       break;
     }
   }
+
+  private void parseAndThrowException(Throwable t) throws YarnException,
+      IOException {
+    if (t instanceof YarnException) {
+      throw (YarnException) t;
+    } else if (t instanceof InvalidToken) {
+      throw (InvalidToken) t;
+    } else {
+      throw (IOException) t;
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Mon Aug 12 21:25:49 2013
@@ -111,70 +111,66 @@ public class FileSystemRMStateStore exte
 
   private void loadRMAppState(RMState rmState) throws Exception {
     try {
-      FileStatus[] childNodes = fs.listStatus(rmAppRoot);
       List<ApplicationAttemptState> attempts =
-                                      new ArrayList<ApplicationAttemptState>();
-      for(FileStatus childNodeStatus : childNodes) {
-        assert childNodeStatus.isFile();
-        String childNodeName = childNodeStatus.getPath().getName();
-        Path childNodePath = getNodePath(rmAppRoot, childNodeName);
-        byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
-        if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
-          // application
-          LOG.info("Loading application from node: " + childNodeName);
-          ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
-          ApplicationStateDataPBImpl appStateData =
-              new ApplicationStateDataPBImpl(
-                                ApplicationStateDataProto.parseFrom(childData));
-          ApplicationState appState = new ApplicationState(
-                               appStateData.getSubmitTime(),
-                               appStateData.getApplicationSubmissionContext(),
-                               appStateData.getUser());
-          // assert child node name is same as actual applicationId
-          assert appId.equals(appState.context.getApplicationId());
-          rmState.appState.put(appId, appState);
-        } else if(childNodeName.startsWith(
-                                ApplicationAttemptId.appAttemptIdStrPrefix)) {
-          // attempt
-          LOG.info("Loading application attempt from node: " + childNodeName);
-          ApplicationAttemptId attemptId =
-                          ConverterUtils.toApplicationAttemptId(childNodeName);
-          ApplicationAttemptStateDataPBImpl attemptStateData =
-              new ApplicationAttemptStateDataPBImpl(
+          new ArrayList<ApplicationAttemptState>();
+
+      for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
+        for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
+          assert childNodeStatus.isFile();
+          String childNodeName = childNodeStatus.getPath().getName();
+          byte[] childData =
+              readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
+          if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+            // application
+            LOG.info("Loading application from node: " + childNodeName);
+            ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
+            ApplicationStateDataPBImpl appStateData =
+                new ApplicationStateDataPBImpl(
+                  ApplicationStateDataProto.parseFrom(childData));
+            ApplicationState appState =
+                new ApplicationState(appStateData.getSubmitTime(),
+                  appStateData.getApplicationSubmissionContext(),
+                  appStateData.getUser());
+            // assert child node name is same as actual applicationId
+            assert appId.equals(appState.context.getApplicationId());
+            rmState.appState.put(appId, appState);
+          } else if (childNodeName
+            .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+            // attempt
+            LOG.info("Loading application attempt from node: " + childNodeName);
+            ApplicationAttemptId attemptId =
+                ConverterUtils.toApplicationAttemptId(childNodeName);
+            ApplicationAttemptStateDataPBImpl attemptStateData =
+                new ApplicationAttemptStateDataPBImpl(
                   ApplicationAttemptStateDataProto.parseFrom(childData));
-          Credentials credentials = null;
-          if(attemptStateData.getAppAttemptTokens() != null){
-            credentials = new Credentials();
-            DataInputByteBuffer dibb = new DataInputByteBuffer();
-            dibb.reset(attemptStateData.getAppAttemptTokens());
-            credentials.readTokenStorageStream(dibb);
+            Credentials credentials = null;
+            if (attemptStateData.getAppAttemptTokens() != null) {
+              credentials = new Credentials();
+              DataInputByteBuffer dibb = new DataInputByteBuffer();
+              dibb.reset(attemptStateData.getAppAttemptTokens());
+              credentials.readTokenStorageStream(dibb);
+            }
+            ApplicationAttemptState attemptState =
+                new ApplicationAttemptState(attemptId,
+                  attemptStateData.getMasterContainer(), credentials);
+
+            // assert child node name is same as application attempt id
+            assert attemptId.equals(attemptState.getAttemptId());
+            attempts.add(attemptState);
+          } else {
+            LOG.info("Unknown child node with name: " + childNodeName);
           }
-          ApplicationAttemptState attemptState =
-              new ApplicationAttemptState(attemptId,
-                attemptStateData.getMasterContainer(), credentials);
-
-          // assert child node name is same as application attempt id
-          assert attemptId.equals(attemptState.getAttemptId());
-          attempts.add(attemptState);
-        } else {
-          LOG.info("Unknown child node with name: " + childNodeName);
         }
       }
 
-      // go through all attempts and add them to their apps
-      for(ApplicationAttemptState attemptState : attempts) {
+      // go through all attempts and add them to their apps, Ideally, each
+      // attempt node must have a corresponding app node, because remove
+      // directory operation remove both at the same time
+      for (ApplicationAttemptState attemptState : attempts) {
         ApplicationId appId = attemptState.getAttemptId().getApplicationId();
         ApplicationState appState = rmState.appState.get(appId);
-        if(appState != null) {
-          appState.attempts.put(attemptState.getAttemptId(), attemptState);
-        } else {
-          // the application node may have been removed when the application
-          // completed but the RM might have stopped before it could remove the
-          // application attempt nodes
-          LOG.info("Application node not found for attempt: "
-                    + attemptState.getAttemptId());
-          deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
-        }
+        assert appState != null;
+        appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     } catch (Exception e) {
       LOG.error("Failed to load state.", e);
@@ -188,6 +184,12 @@ public class FileSystemRMStateStore exte
     for(FileStatus childNodeStatus : childNodes) {
       assert childNodeStatus.isFile();
       String childNodeName = childNodeStatus.getPath().getName();
+      if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+        rmState.rmSecretManagerState.dtSequenceNumber =
+            Integer.parseInt(childNodeName.split("_")[1]);
+        continue;
+      }
+
       Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
       byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
@@ -202,10 +204,7 @@ public class FileSystemRMStateStore exte
         long renewDate = fsIn.readLong();
         rmState.rmSecretManagerState.delegationTokenState.put(identifier,
           renewDate);
-      } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
-        rmState.rmSecretManagerState.dtSequenceNumber =
-            Integer.parseInt(childNodeName.split("_")[1]);
-      }else {
+      } else {
         LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
       }
       fsIn.close();
@@ -215,7 +214,9 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void storeApplicationState(String appId,
       ApplicationStateDataPBImpl appStateDataPB) throws Exception {
-    Path nodeCreatePath = getNodePath(rmAppRoot, appId);
+    Path appDirPath = getAppDir(rmAppRoot, appId);
+    fs.mkdirs(appDirPath);
+    Path nodeCreatePath = getNodePath(appDirPath, appId);
 
     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -232,7 +233,11 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void storeApplicationAttemptState(String attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
-    Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    ApplicationAttemptId appAttemptId =
+        ConverterUtils.toApplicationAttemptId(attemptId);
+    Path appDirPath =
+        getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+    Path nodeCreatePath = getNodePath(appDirPath, attemptId);
     LOG.info("Storing info for attempt: " + attemptId
              + " at: " + nodeCreatePath);
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -250,20 +255,9 @@ public class FileSystemRMStateStore exte
   public synchronized void removeApplicationState(ApplicationState appState)
       throws Exception {
     String appId = appState.getAppId().toString();
-    Path nodeRemovePath = getNodePath(rmAppRoot, appId);
+    Path nodeRemovePath = getAppDir(rmAppRoot, appId);
     LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
     deleteFile(nodeRemovePath);
-    for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      removeApplicationAttemptState(attemptId.toString());
-    }
-  }
-
-  public synchronized void removeApplicationAttemptState(String attemptId)
-      throws Exception {
-    Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
-    LOG.info("Removing info for attempt: " + attemptId
-             + " at: " + nodeRemovePath);
-    deleteFile(nodeRemovePath);
   }
 
   @Override
@@ -329,6 +323,10 @@ public class FileSystemRMStateStore exte
     deleteFile(nodeCreatePath);
   }
 
+  private Path getAppDir(Path root, String appId) {
+    return getNodePath(root, appId);
+  }
+
   // FileSystem related code
 
   private void deleteFile(Path deletePath) throws Exception {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Aug 12 21:25:49 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -43,9 +44,9 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -376,11 +377,16 @@ public abstract class RMStateStore {
   protected abstract void removeApplicationState(ApplicationState appState) 
                                                              throws Exception;
 
+  // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
+  // YARN-986 
+  public static final Text AM_RM_TOKEN_SERVICE = new Text(
+    "AM_RM_TOKEN_SERVICE");
+  
   private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
     Credentials credentials = new Credentials();
     Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
     if(appToken != null){
-      credentials.addToken(appToken.getService(), appToken);
+      credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
     }
     Token<ClientToAMTokenIdentifier> clientToAMToken =
         appAttempt.getClientToAMToken();