You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/09/14 09:26:38 UTC

svn commit: r1170459 [1/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/...

Author: mahadev
Date: Wed Sep 14 07:26:37 2011
New Revision: 1170459

URL: http://svn.apache.org/viewvc?rev=1170459&view=rev
Log:
MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a ContainerLaunchContext (Arun Murthy via mahadev)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Sep 14 07:26:37 2011
@@ -289,6 +289,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via 
     mahadev)
 
+    MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a 
+    ContainerLaunchContext (Arun Murthy via mahadev)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed Sep 14 07:26:37 2011
@@ -579,13 +579,12 @@ public abstract class TaskAttemptImpl im
           + remoteJobConfPath.toUri().toASCIIString());
       // //////////// End of JobConf setup
 
-      
       // Setup DistributedCache
-      setupDistributedCache(remoteFS, conf, localResources, environment);
+      MRApps.setupDistributedCache(conf, localResources, environment);
 
       // Set local-resources and environment
       container.setLocalResources(localResources);
-      container.setEnv(environment);
+      container.setEnvironment(environment);
       
       // Setup up tokens
       Credentials taskCredentials = new Credentials();
@@ -618,7 +617,7 @@ public abstract class TaskAttemptImpl im
           ShuffleHandler.serializeServiceData(jobToken));
       container.setServiceData(serviceData);
 
-      MRApps.addToClassPath(container.getEnv(), getInitialClasspath());
+      MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath());
     } catch (IOException e) {
       throw new YarnException(e);
     }
@@ -645,7 +644,7 @@ public abstract class TaskAttemptImpl im
         taskAttemptListener.getAddress(), remoteTask, javaHome,
         workDir.toString(), containerLogDir, childTmpDir, jvmID));
 
-    MapReduceChildJVM.setVMEnv(container.getEnv(), classPaths,
+    MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths,
         workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
         localizedApplicationTokensFile);
 
@@ -656,116 +655,6 @@ public abstract class TaskAttemptImpl im
     return container;
   }
 
-  private static long[] parseTimeStamps(String[] strs) {
-    if (null == strs) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
-
-  private void setupDistributedCache(FileSystem remoteFS, 
-      Configuration conf, 
-      Map<String, LocalResource> localResources,
-      Map<String, String> env) 
-  throws IOException {
-    
-    // Cache archives
-    parseDistributedCacheArtifacts(remoteFS, localResources, env, 
-        LocalResourceType.ARCHIVE, 
-        DistributedCache.getCacheArchives(conf), 
-        parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), 
-        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
-        DistributedCache.getArchiveVisibilities(conf), 
-        DistributedCache.getArchiveClassPaths(conf));
-    
-    // Cache files
-    parseDistributedCacheArtifacts(remoteFS, 
-        localResources, env, 
-        LocalResourceType.FILE, 
-        DistributedCache.getCacheFiles(conf),
-        parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
-        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
-        DistributedCache.getFileVisibilities(conf),
-        DistributedCache.getFileClassPaths(conf));
-  }
-
-  // TODO - Move this to MR!
-  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
-  // long[], boolean[], Path[], FileType)
-  private void parseDistributedCacheArtifacts(
-      FileSystem remoteFS, 
-      Map<String, LocalResource> localResources,
-      Map<String, String> env,
-      LocalResourceType type,
-      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
-      Path[] pathsToPutOnClasspath) throws IOException {
-
-    if (uris != null) {
-      // Sanity check
-      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
-          (uris.length != visibilities.length)) {
-        throw new IllegalArgumentException("Invalid specification for " +
-        		"distributed-cache artifacts of type " + type + " :" +
-        		" #uris=" + uris.length +
-        		" #timestamps=" + timestamps.length +
-        		" #visibilities=" + visibilities.length
-        		);
-      }
-      
-      Map<String, Path> classPaths = new HashMap<String, Path>();
-      if (pathsToPutOnClasspath != null) {
-        for (Path p : pathsToPutOnClasspath) {
-          p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-              remoteFS.getWorkingDirectory()));
-          classPaths.put(p.toUri().getPath().toString(), p);
-        }
-      }
-      for (int i = 0; i < uris.length; ++i) {
-        URI u = uris[i];
-        Path p = new Path(u);
-        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
-            remoteFS.getWorkingDirectory()));
-        // Add URI fragment or just the filename
-        Path name = new Path((null == u.getFragment())
-          ? p.getName()
-          : u.getFragment());
-        if (name.isAbsolute()) {
-          throw new IllegalArgumentException("Resource name must be relative");
-        }
-        String linkName = name.toUri().getPath();
-        localResources.put(
-            linkName,
-            BuilderUtils.newLocalResource(
-                p.toUri(), type, 
-                visibilities[i]
-                  ? LocalResourceVisibility.PUBLIC
-                  : LocalResourceVisibility.PRIVATE,
-                sizes[i], timestamps[i])
-        );
-        if (classPaths.containsKey(u.getPath())) {
-          MRApps.addToClassPath(env, linkName);
-        }
-      }
-    }
-  }
-  
-  // TODO - Move this to MR!
-  private static long[] getFileSizes(Configuration conf, String key) {
-    String[] strs = conf.getStrings(key);
-    if (strs == null) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
-  
   @Override
   public ContainerId getAssignedContainerID() {
     readLock.lock();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Wed Sep 14 07:26:37 2011
@@ -25,14 +25,20 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.URI;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -42,12 +48,18 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 /**
  * Helper class for MR applications
  */
+@Private
+@Unstable
 public class MRApps extends Apps {
   public static final String JOB = "job";
   public static final String TASK = "task";
@@ -232,4 +244,121 @@ public class MRApps extends Apps {
         jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
     return jobFile.toString();
   }
+  
+
+
+  private static long[] parseTimeStamps(String[] strs) {
+    if (null == strs) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+
+  public static void setupDistributedCache( 
+      Configuration conf, 
+      Map<String, LocalResource> localResources,
+      Map<String, String> env) 
+  throws IOException {
+    
+    // Cache archives
+    parseDistributedCacheArtifacts(conf, localResources, env, 
+        LocalResourceType.ARCHIVE, 
+        DistributedCache.getCacheArchives(conf), 
+        parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), 
+        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
+        DistributedCache.getArchiveVisibilities(conf), 
+        DistributedCache.getArchiveClassPaths(conf));
+    
+    // Cache files
+    parseDistributedCacheArtifacts(conf, 
+        localResources, env, 
+        LocalResourceType.FILE, 
+        DistributedCache.getCacheFiles(conf),
+        parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
+        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
+        DistributedCache.getFileVisibilities(conf),
+        DistributedCache.getFileClassPaths(conf));
+  }
+
+  // TODO - Move this to MR!
+  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
+  // long[], boolean[], Path[], FileType)
+  private static void parseDistributedCacheArtifacts(
+      Configuration conf,
+      Map<String, LocalResource> localResources,
+      Map<String, String> env,
+      LocalResourceType type,
+      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
+      Path[] pathsToPutOnClasspath) throws IOException {
+
+    if (uris != null) {
+      // Sanity check
+      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+          (uris.length != visibilities.length)) {
+        throw new IllegalArgumentException("Invalid specification for " +
+            "distributed-cache artifacts of type " + type + " :" +
+            " #uris=" + uris.length +
+            " #timestamps=" + timestamps.length +
+            " #visibilities=" + visibilities.length
+            );
+      }
+      
+      Map<String, Path> classPaths = new HashMap<String, Path>();
+      if (pathsToPutOnClasspath != null) {
+        for (Path p : pathsToPutOnClasspath) {
+          FileSystem remoteFS = p.getFileSystem(conf);
+          p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+              remoteFS.getWorkingDirectory()));
+          classPaths.put(p.toUri().getPath().toString(), p);
+        }
+      }
+      for (int i = 0; i < uris.length; ++i) {
+        URI u = uris[i];
+        Path p = new Path(u);
+        FileSystem remoteFS = p.getFileSystem(conf);
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+        // Add URI fragment or just the filename
+        Path name = new Path((null == u.getFragment())
+          ? p.getName()
+          : u.getFragment());
+        if (name.isAbsolute()) {
+          throw new IllegalArgumentException("Resource name must be relative");
+        }
+        String linkName = name.toUri().getPath();
+        localResources.put(
+            linkName,
+            BuilderUtils.newLocalResource(
+                p.toUri(), type, 
+                visibilities[i]
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.PRIVATE,
+                sizes[i], timestamps[i])
+        );
+        if (classPaths.containsKey(u.getPath())) {
+          MRApps.addToClassPath(env, linkName);
+        }
+      }
+    }
+  }
+  
+  // TODO - Move this to MR!
+  private static long[] getFileSizes(Configuration conf, String key) {
+    String[] strs = conf.getStrings(key);
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+  
+
+
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Wed Sep 14 07:26:37 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,7 +32,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -55,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskR
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
@@ -72,6 +69,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -237,7 +235,6 @@ public class YARNRunner implements Clien
     // Construct necessary information to start the MR AM
     ApplicationSubmissionContext appContext = 
       createApplicationSubmissionContext(conf, jobSubmitDir, ts);
-    setupDistributedCache(conf, appContext);
     
     // XXX Remove
     in.close();
@@ -273,16 +270,18 @@ public class YARNRunner implements Clien
   public ApplicationSubmissionContext createApplicationSubmissionContext(
       Configuration jobConf,
       String jobSubmitDir, Credentials ts) throws IOException {
-    ApplicationSubmissionContext appContext =
-        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
     ApplicationId applicationId = resMgrDelegate.getApplicationId();
-    appContext.setApplicationId(applicationId);
+    
+    // Setup resource requirements
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
         MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
     LOG.info("AppMaster capability = " + capability);
-    appContext.setMasterCapability(capability);
 
+    // Setup LocalResources
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    
     Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
     
     URL yarnUrlForJobSubmitDir = ConverterUtils
@@ -292,14 +291,11 @@ public class YARNRunner implements Clien
     LOG.debug("Creating setup context, jobSubmitDir url is "
         + yarnUrlForJobSubmitDir);
 
-    appContext.setResource(MRConstants.JOB_SUBMIT_DIR,
-        yarnUrlForJobSubmitDir);
-
-    appContext.setResourceTodo(MRConstants.JOB_CONF_FILE,
+    localResources.put(MRConstants.JOB_CONF_FILE,
         createApplicationResource(defaultFileContext,
             jobConfPath));
     if (jobConf.get(MRJobConfig.JAR) != null) {
-      appContext.setResourceTodo(MRConstants.JOB_JAR,
+      localResources.put(MRConstants.JOB_JAR,
           createApplicationResource(defaultFileContext,
               new Path(jobSubmitDir, MRConstants.JOB_JAR)));
     } else {
@@ -312,30 +308,21 @@ public class YARNRunner implements Clien
     // TODO gross hack
     for (String s : new String[] { "job.split", "job.splitmetainfo",
         MRConstants.APPLICATION_TOKENS_FILE }) {
-      appContext.setResourceTodo(
+      localResources.put(
           MRConstants.JOB_SUBMIT_DIR + "/" + s,
-          createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s)));
-    }
-
-    // TODO: Only if security is on.
-    List<String> fsTokens = new ArrayList<String>();
-    for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-      fsTokens.add(token.encodeToUrlString());
+          createApplicationResource(defaultFileContext, 
+              new Path(jobSubmitDir, s)));
     }
     
-    // TODO - Remove this!
-    appContext.addAllFsTokens(fsTokens);
-    DataOutputBuffer dob = new DataOutputBuffer();
-    ts.writeTokenStorageToStream(dob);
-    appContext.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+    // Setup security tokens
+    ByteBuffer securityTokens = null;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      ts.writeTokenStorageToStream(dob);
+      securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    }
 
-    // Add queue information
-    appContext.setQueue(jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME));
-    
-    // Add job name
-    appContext.setApplicationName(jobConf.get(JobContext.JOB_NAME, "N/A"));
-    
-    // Add the command line
+    // Setup the command to run the AM
     String javaHome = "$JAVA_HOME";
     Vector<CharSequence> vargs = new Vector<CharSequence>(8);
     vargs.add(javaHome + "/bin/java");
@@ -346,13 +333,6 @@ public class YARNRunner implements Clien
     vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
         MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
 
-    // Add { job jar, MR app jar } to classpath.
-    Map<String, String> environment = new HashMap<String, String>();
-    MRApps.setInitialClasspath(environment);
-    MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
-    MRApps.addToClassPath(environment,
-        MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
-    appContext.addAllEnvironment(environment);
     vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
     vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
     vargs.add(String.valueOf(applicationId.getId()));
@@ -370,140 +350,43 @@ public class YARNRunner implements Clien
 
     LOG.info("Command to launch container for ApplicationMaster is : "
         + mergedCommand);
+    
+    // Setup the environment - Add { job jar, MR app jar } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+    MRApps.setInitialClasspath(environment);
+    MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
+    MRApps.addToClassPath(environment,
+        MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
 
-    appContext.addAllCommands(vargsFinal);
-    // TODO: RM should get this from RPC.
-    appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-    return appContext;
-  }
+    // Parse distributed cache
+    MRApps.setupDistributedCache(jobConf, localResources, environment);
 
-  /**
-   *    * TODO: Copied for now from TaskAttemptImpl.java ... fixme
-   * @param strs
-   * @return
-   */
-  private static long[] parseTimeStamps(String[] strs) {
-    if (null == strs) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
+    // Setup ContainerLaunchContext for AM container
+    ContainerLaunchContext amContainer =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    amContainer.setResource(capability);             // Resource (mem) required
+    amContainer.setLocalResources(localResources);   // Local resources
+    amContainer.setEnvironment(environment);         // Environment
+    amContainer.setCommands(vargsFinal);             // Command for AM
+    amContainer.setContainerTokens(securityTokens);  // Security tokens
 
-  /**
-   * TODO: Copied for now from TaskAttemptImpl.java ... fixme
-   * 
-   * TODO: This is currently needed in YarnRunner as user code like setupJob,
-   * cleanupJob may need access to dist-cache. Once we separate distcache for
-   * maps, reduces, setup etc, this can include only a subset of artificats.
-   * This is also needed for uberAM case where we run everything inside AM.
-   */
-  private void setupDistributedCache(Configuration conf, 
-      ApplicationSubmissionContext container) throws IOException {
-    
-    // Cache archives
-    parseDistributedCacheArtifacts(conf, container, LocalResourceType.ARCHIVE, 
-        DistributedCache.getCacheArchives(conf), 
-        parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), 
-        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
-        DistributedCache.getArchiveVisibilities(conf), 
-        DistributedCache.getArchiveClassPaths(conf));
-    
-    // Cache files
-    parseDistributedCacheArtifacts(conf, container, LocalResourceType.FILE, 
-        DistributedCache.getCacheFiles(conf),
-        parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
-        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
-        DistributedCache.getFileVisibilities(conf),
-        DistributedCache.getFileClassPaths(conf));
-  }
-
-  // TODO - Move this to MR!
-  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
-  private void parseDistributedCacheArtifacts(Configuration conf,
-      ApplicationSubmissionContext container, LocalResourceType type,
-      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
-      Path[] pathsToPutOnClasspath) throws IOException {
-
-    if (uris != null) {
-      // Sanity check
-      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
-          (uris.length != visibilities.length)) {
-        throw new IllegalArgumentException("Invalid specification for " +
-            "distributed-cache artifacts of type " + type + " :" +
-            " #uris=" + uris.length +
-            " #timestamps=" + timestamps.length +
-            " #visibilities=" + visibilities.length
-            );
-      }
-      
-      Map<String, Path> classPaths = new HashMap<String, Path>();
-      if (pathsToPutOnClasspath != null) {
-        for (Path p : pathsToPutOnClasspath) {
-          FileSystem fs = p.getFileSystem(conf);
-          p = p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
-          classPaths.put(p.toUri().getPath().toString(), p);
-        }
-      }
-      for (int i = 0; i < uris.length; ++i) {
-        URI u = uris[i];
-        Path p = new Path(u);
-        FileSystem fs = p.getFileSystem(conf);
-        p = fs.resolvePath(
-            p.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
-        // Add URI fragment or just the filename
-        Path name = new Path((null == u.getFragment())
-          ? p.getName()
-          : u.getFragment());
-        if (name.isAbsolute()) {
-          throw new IllegalArgumentException("Resource name must be relative");
-        }
-        String linkName = name.toUri().getPath();
-        container.setResourceTodo(
-            linkName,
-            createLocalResource(
-                p.toUri(), type, 
-                visibilities[i]
-                  ? LocalResourceVisibility.PUBLIC
-                  : LocalResourceVisibility.PRIVATE,
-                sizes[i], timestamps[i])
-        );
-        if (classPaths.containsKey(u.getPath())) {
-          Map<String, String> environment = container.getAllEnvironment();
-          MRApps.addToClassPath(environment, linkName);
-        }
-      }
-    }
-  }
+    // Set up the ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext =
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    appContext.setApplicationId(applicationId);                // ApplicationId
+    appContext.setUser(                                        // User name
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    appContext.setQueue(                                       // Queue name
+        jobConf.get(JobContext.QUEUE_NAME,     
+        YarnConfiguration.DEFAULT_QUEUE_NAME));
+    appContext.setApplicationName(                             // Job name
+        jobConf.get(JobContext.JOB_NAME, 
+        YarnConfiguration.DEFAULT_APPLICATION_NAME));              
+    appContext.setAMContainerSpec(amContainer);         // AM Container 
 
-  // TODO - Move this to MR!
-  private static long[] getFileSizes(Configuration conf, String key) {
-    String[] strs = conf.getStrings(key);
-    if (strs == null) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
-  }
-  
-  private LocalResource createLocalResource(URI uri, 
-      LocalResourceType type, LocalResourceVisibility visibility, 
-      long size, long timestamp) throws IOException {
-    LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
-    resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
-    resource.setType(type);
-    resource.setVisibility(visibility);
-    resource.setSize(size);
-    resource.setTimestamp(timestamp);
-    return resource;
+    return appContext;
   }
-  
+
   @Override
   public void setJobPriority(JobID arg0, String arg1) throws IOException,
       InterruptedException {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java Wed Sep 14 07:26:37 2011
@@ -18,14 +18,8 @@
 
 package org.apache.hadoop.yarn.api.records;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 
 /**
@@ -36,26 +30,17 @@ import org.apache.hadoop.yarn.api.Client
  * <p>It includes details such as:
  *   <ul>
  *     <li>{@link ApplicationId} of the application.</li>
- *     <li>
- *       {@link Resource} necessary to run the <code>ApplicationMaster</code>.
- *     </li>
  *     <li>Application user.</li>
  *     <li>Application name.</li>
  *     <li>{@link Priority} of the application.</li>
- *     <li>Security tokens (if security is enabled).</li>
- *     <li>
- *       {@link LocalResource} necessary for running the 
- *       <code>ApplicationMaster</code> container such
- *       as binaries, jar, shared-objects, side-files etc. 
- *     </li>
  *     <li>
- *       Environment variables for the launched <code>ApplicationMaster</code> 
- *       process.
+ *       {@link ContainerLaunchContext} of the container in which the 
+ *       <code>ApplicationMaster</code> is executed.
  *     </li>
- *     <li>Command to launch the <code>ApplicationMaster</code>.</li>
  *   </ul>
  * </p>
  * 
+ * @see ContainerLaunchContext
  * @see ClientRMProtocol#submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest)
  */
 @Public
@@ -143,198 +128,25 @@ public interface ApplicationSubmissionCo
   public void setUser(String user);
   
   /**
-   * Get the <code>Resource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @return <code>Resource</code> required to run the 
-   *         <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public Resource getMasterCapability();
-  
-  /**
-   * Set <code>Resource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @param masterCapability <code>Resource</code> required to run the 
-   *                         <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public void setMasterCapability(Resource masterCapability);
-  
-  @Private
-  @Unstable
-  public Map<String, URL> getAllResources();
-  
-  @Private
-  @Unstable
-  public URL getResource(String key);
-  
-  @Private
-  @Unstable
-  public void addAllResources(Map<String, URL> resources);
-
-  @Private
-  @Unstable
-  public void setResource(String key, URL url);
-
-  @Private
-  @Unstable
-  public void removeResource(String key);
-
-  @Private
-  @Unstable
-  public void clearResources();
-
-  /**
-   * Get all the <code>LocalResource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @return <code>LocalResource</code> required to run the 
-   *         <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public Map<String, LocalResource> getAllResourcesTodo();
-  
-  @Private
-  @Unstable
-  public LocalResource getResourceTodo(String key);
-  
-  /**
-   * Add all the <code>LocalResource</code> required to run the 
-   * <code>ApplicationMaster</code>.
-   * @param resources all <code>LocalResource</code> required to run the 
-   *                      <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public void addAllResourcesTodo(Map<String, LocalResource> resources);
-
-  @Private
-  @Unstable
-  public void setResourceTodo(String key, LocalResource localResource);
-
-  @Private
-  @Unstable
-  public void removeResourceTodo(String key);
-
-  @Private
-  @Unstable
-  public void clearResourcesTodo();
-
-  @Private
-  @Unstable
-  public List<String> getFsTokenList();
-  
-  @Private
-  @Unstable
-  public String getFsToken(int index);
-  
-  @Private
-  @Unstable
-  public int getFsTokenCount();
-  
-  @Private
-  @Unstable
-  public void addAllFsTokens(List<String> fsTokens);
-
-  @Private
-  @Unstable
-  public void addFsToken(String fsToken);
-
-  @Private
-  @Unstable
-  public void removeFsToken(int index);
-
-  @Private
-  @Unstable
-  public void clearFsTokens();
-
-  /**
-   * Get <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
-   * @return file-system tokens for the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public ByteBuffer getFsTokensTodo();
-  
-  /**
-   * Set <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
-   * @param fsTokens file-system tokens for the <code>ApplicationMaster</code>
+   * Get the <code>ContainerLaunchContext</code> to describe the 
+   * <code>Container</code> with which the <code>ApplicationMaster</code> is
+   * launched.
+   * @return <code>ContainerLaunchContext</code> for the 
+   *         <code>ApplicationMaster</code> container
    */
   @Public
   @Stable
-  public void setFsTokensTodo(ByteBuffer fsTokens);
-
-  /**
-   * Get the <em>environment variables</em> for the 
-   * <code>ApplicationMaster</code>.
-   * @return environment variables for the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public Map<String, String> getAllEnvironment();
-  
-  @Private
-  @Unstable
-  public String getEnvironment(String key);
+  public ContainerLaunchContext getAMContainerSpec();
   
   /**
-   * Add all of the <em>environment variables</em> for the 
-   * <code>ApplicationMaster</code>.
-   * @param environment environment variables for the 
-   *                    <code>ApplicationMaster</code>
+   * Set the <code>ContainerLaunchContext</code> to describe the 
+   * <code>Container</code> with which the <code>ApplicationMaster</code> is
+   * launched.
+   * @param amContainer <code>ContainerLaunchContext</code> for the 
+   *                    <code>ApplicationMaster</code> container
    */
   @Public
   @Stable
-  public void addAllEnvironment(Map<String, String> environment);
+  public void setAMContainerSpec(ContainerLaunchContext amContainer);
 
-  @Private
-  @Unstable
-  public void setEnvironment(String key, String env);
-
-  @Private
-  @Unstable
-  public void removeEnvironment(String key);
-
-  @Private
-  @Unstable
-  public void clearEnvironment();
-
-  /**
-   * Get the <em>commands</em> to launch the <code>ApplicationMaster</code>.
-   * @return commands to launch the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public List<String> getCommandList();
-  
-  @Private
-  @Unstable
-  public String getCommand(int index);
-  
-  @Private
-  @Unstable
-  public int getCommandCount();
-  
-  /**
-   * Add all of the <em>commands</em> to launch the 
-   * <code>ApplicationMaster</code>.
-   * @param commands commands to launch the <code>ApplicationMaster</code>
-   */
-  @Public
-  @Stable
-  public void addAllCommands(List<String> commands);
-  
-  @Private
-  @Unstable
-  public void addCommand(String command);
-  
-  @Private
-  @Unstable
-  public void removeCommand(int index);
-  
-  @Private
-  @Unstable
-  public void clearCommands();
 }
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java Wed Sep 14 07:26:37 2011
@@ -156,7 +156,7 @@ public interface ContainerLaunchContext 
    */
   @Public
   @Stable
-  Map<String, String> getEnv();
+  Map<String, String> getEnvironment();
     
   /**
    * Add <em>environment variables</em> for the container.
@@ -164,7 +164,7 @@ public interface ContainerLaunchContext 
    */
   @Public
   @Stable
-  void setEnv(Map<String, String> environment);
+  void setEnvironment(Map<String, String> environment);
 
   /**
    * Get the list of <em>commands</em> for launching the container.

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java Wed Sep 14 07:26:37 2011
@@ -18,56 +18,35 @@
 
 package org.apache.hadoop.yarn.api.records.impl.pb;
 
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringURLMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.URLProto;
-
-
     
-public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSubmissionContextProto> implements ApplicationSubmissionContext {
-  ApplicationSubmissionContextProto proto = ApplicationSubmissionContextProto.getDefaultInstance();
+public class ApplicationSubmissionContextPBImpl 
+extends ProtoBase<ApplicationSubmissionContextProto> 
+implements ApplicationSubmissionContext {
+  ApplicationSubmissionContextProto proto = 
+      ApplicationSubmissionContextProto.getDefaultInstance();
   ApplicationSubmissionContextProto.Builder builder = null;
   boolean viaProto = false;
   
   private ApplicationId applicationId = null;
-  private Resource masterCapability = null;
-  private Map<String, URL> resources = null;
-  private Map<String, LocalResource> resourcesTodo = null;
-  private List<String> fsTokenList = null;
-  private ByteBuffer fsTokenTodo = null;
-  private Map<String, String> environment = null;
-  private List<String> commandList = null;
   private Priority priority = null;
-  
-  
+  private ContainerLaunchContext amContainer = null;
   
   public ApplicationSubmissionContextPBImpl() {
     builder = ApplicationSubmissionContextProto.newBuilder();
   }
 
-  public ApplicationSubmissionContextPBImpl(ApplicationSubmissionContextProto proto) {
+  public ApplicationSubmissionContextPBImpl(
+      ApplicationSubmissionContextProto proto) {
     this.proto = proto;
     viaProto = true;
   }
@@ -83,30 +62,12 @@ public class ApplicationSubmissionContex
     if (this.applicationId != null) {
       builder.setApplicationId(convertToProtoFormat(this.applicationId));
     }
-    if (this.masterCapability != null) {
-      builder.setMasterCapability(convertToProtoFormat(this.masterCapability));
-    }
-    if (this.resources != null) {
-      addResourcesToProto();
-    }
-    if (this.resourcesTodo != null) {
-      addResourcesTodoToProto();
-    }
-    if (this.fsTokenList != null) {
-      addFsTokenListToProto();
-    }
-    if (this.fsTokenTodo != null) {
-      builder.setFsTokensTodo(convertToProtoFormat(this.fsTokenTodo));
-    }
-    if (this.environment != null) {
-      addEnvironmentToProto();
-    }
-    if (this.commandList != null) {
-      addCommandsToProto();
-    }
     if (this.priority != null) {
       builder.setPriority(convertToProtoFormat(this.priority));
     }
+    if (this.amContainer != null) {
+      builder.setAmContainerSpec(convertToProtoFormat(this.amContainer));
+    }
   }
 
   private void mergeLocalToProto() {
@@ -145,6 +106,7 @@ public class ApplicationSubmissionContex
       builder.clearPriority();
     this.priority = priority;
   }
+  
   @Override
   public ApplicationId getApplicationId() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -165,6 +127,7 @@ public class ApplicationSubmissionContex
       builder.clearApplicationId();
     this.applicationId = applicationId;
   }
+  
   @Override
   public String getApplicationName() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -183,403 +146,7 @@ public class ApplicationSubmissionContex
     }
     builder.setApplicationName((applicationName));
   }
-  @Override
-  public Resource getMasterCapability() {
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.masterCapability != null) {
-      return masterCapability;
-    } // Else via proto
-    if (!p.hasMasterCapability()) {
-      return null;
-    }
-    masterCapability = convertFromProtoFormat(p.getMasterCapability());
-    return this.masterCapability;
-  }
-
-  @Override
-  public void setMasterCapability(Resource masterCapability) {
-    maybeInitBuilder();
-    if (masterCapability == null)
-      builder.clearMasterCapability();
-    this.masterCapability = masterCapability;
-  }
-  @Override
-  public Map<String, URL> getAllResources() {
-    initResources();
-    return this.resources;
-  }
-  @Override
-  public URL getResource(String key) {
-    initResources();
-    return this.resources.get(key);
-  }
-  
-  private void initResources() {
-    if (this.resources != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringURLMapProto> mapAsList = p.getResourcesList();
-    this.resources = new HashMap<String, URL>();
-    
-    for (StringURLMapProto c : mapAsList) {
-      this.resources.put(c.getKey(), convertFromProtoFormat(c.getValue()));
-    }
-  }
-  
-  @Override
-  public void addAllResources(final Map<String, URL> resources) {
-    if (resources == null)
-      return;
-    initResources();
-    this.resources.putAll(resources);
-  }
-  
-  private void addResourcesToProto() {
-    maybeInitBuilder();
-    builder.clearResources();
-    if (this.resources == null)
-      return;
-    Iterable<StringURLMapProto> iterable = new Iterable<StringURLMapProto>() {
-      
-      @Override
-      public Iterator<StringURLMapProto> iterator() {
-        return new Iterator<StringURLMapProto>() {
-          
-          Iterator<String> keyIter = resources.keySet().iterator();
-          
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-          
-          @Override
-          public StringURLMapProto next() {
-            String key = keyIter.next();
-            return StringURLMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resources.get(key))).build();
-          }
-          
-          @Override
-          public boolean hasNext() {
-            return keyIter.hasNext();
-          }
-        };
-      }
-    };
-    builder.addAllResources(iterable);
-  }
-  @Override
-  public void setResource(String key, URL val) {
-    initResources();
-    this.resources.put(key, val);
-  }
-  @Override
-  public void removeResource(String key) {
-    initResources();
-    this.resources.remove(key);
-  }
-  @Override
-  public void clearResources() {
-    initResources();
-    this.resources.clear();
-  }
-  @Override
-  public Map<String, LocalResource> getAllResourcesTodo() {
-    initResourcesTodo();
-    return this.resourcesTodo;
-  }
-  @Override
-  public LocalResource getResourceTodo(String key) {
-    initResourcesTodo();
-    return this.resourcesTodo.get(key);
-  }
-  
-  private void initResourcesTodo() {
-    if (this.resourcesTodo != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringLocalResourceMapProto> mapAsList = p.getResourcesTodoList();
-    this.resourcesTodo = new HashMap<String, LocalResource>();
-    
-    for (StringLocalResourceMapProto c : mapAsList) {
-      this.resourcesTodo.put(c.getKey(), convertFromProtoFormat(c.getValue()));
-    }
-  }
-  
-  @Override
-  public void addAllResourcesTodo(final Map<String, LocalResource> resourcesTodo) {
-    if (resourcesTodo == null) 
-      return;
-    initResourcesTodo();
-    this.resourcesTodo.putAll(resourcesTodo);
-  }
-  
-  private void addResourcesTodoToProto() {
-    maybeInitBuilder();
-    builder.clearResourcesTodo();
-    if (resourcesTodo == null)
-      return;
-    Iterable<StringLocalResourceMapProto> iterable = new Iterable<StringLocalResourceMapProto>() {
-      
-      @Override
-      public Iterator<StringLocalResourceMapProto> iterator() {
-        return new Iterator<StringLocalResourceMapProto>() {
-          
-          Iterator<String> keyIter = resourcesTodo.keySet().iterator();
-          
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-          
-          @Override
-          public StringLocalResourceMapProto next() {
-            String key = keyIter.next();
-            return StringLocalResourceMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resourcesTodo.get(key))).build();
-          }
-          
-          @Override
-          public boolean hasNext() {
-            return keyIter.hasNext();
-          }
-        };
-      }
-    };
-    builder.addAllResourcesTodo(iterable);
-  }
-  @Override
-  public void setResourceTodo(String key, LocalResource val) {
-    initResourcesTodo();
-    this.resourcesTodo.put(key, val);
-  }
-  @Override
-  public void removeResourceTodo(String key) {
-    initResourcesTodo();
-    this.resourcesTodo.remove(key);
-  }
-  @Override
-  public void clearResourcesTodo() {
-    initResourcesTodo();
-    this.resourcesTodo.clear();
-  }
-  @Override
-  public List<String> getFsTokenList() {
-    initFsTokenList();
-    return this.fsTokenList;
-  }
-  @Override
-  public String getFsToken(int index) {
-    initFsTokenList();
-    return this.fsTokenList.get(index);
-  }
-  @Override
-  public int getFsTokenCount() {
-    initFsTokenList();
-    return this.fsTokenList.size();
-  }
-  
-  private void initFsTokenList() {
-    if (this.fsTokenList != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<String> list = p.getFsTokensList();
-    this.fsTokenList = new ArrayList<String>();
-
-    for (String c : list) {
-      this.fsTokenList.add(c);
-    }
-  }
-  
-  @Override
-  public void addAllFsTokens(final List<String> fsTokens) {
-    if (fsTokens == null) 
-      return;
-    initFsTokenList();
-    this.fsTokenList.addAll(fsTokens);
-  }
-  
-  private void addFsTokenListToProto() {
-    maybeInitBuilder();
-    builder.clearFsTokens();
-    builder.addAllFsTokens(this.fsTokenList);
-  }
-
-  @Override
-  public void addFsToken(String fsTokens) {
-    initFsTokenList();
-    this.fsTokenList.add(fsTokens);
-  }
-  @Override
-  public void removeFsToken(int index) {
-    initFsTokenList();
-    this.fsTokenList.remove(index);
-  }
-  @Override
-  public void clearFsTokens() {
-    initFsTokenList();
-    this.fsTokenList.clear();
-  }
-  @Override
-  public ByteBuffer getFsTokensTodo() {
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.fsTokenTodo != null) {
-      return this.fsTokenTodo;
-    }
-    if (!p.hasFsTokensTodo()) {
-      return null;
-    }
-    this.fsTokenTodo = convertFromProtoFormat(p.getFsTokensTodo());
-    return this.fsTokenTodo;
-  }
-
-  @Override
-  public void setFsTokensTodo(ByteBuffer fsTokensTodo) {
-    maybeInitBuilder();
-    if (fsTokensTodo == null) 
-      builder.clearFsTokensTodo();
-    this.fsTokenTodo = fsTokensTodo;
-  }
-  @Override
-  public Map<String, String> getAllEnvironment() {
-    initEnvironment();
-    return this.environment;
-  }
-  @Override
-  public String getEnvironment(String key) {
-    initEnvironment();
-    return this.environment.get(key);
-  }
-  
-  private void initEnvironment() {
-    if (this.environment != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringStringMapProto> mapAsList = p.getEnvironmentList();
-    this.environment = new HashMap<String, String>();
-    
-    for (StringStringMapProto c : mapAsList) {
-      this.environment.put(c.getKey(), c.getValue());
-    }
-  }
-  
-  @Override
-  public void addAllEnvironment(Map<String, String> environment) {
-    if (environment == null)
-      return;
-    initEnvironment();
-    this.environment.putAll(environment);
-  }
-  
-  private void addEnvironmentToProto() {
-    maybeInitBuilder();
-    builder.clearEnvironment();
-    if (environment == null)
-      return;
-    Iterable<StringStringMapProto> iterable = new Iterable<StringStringMapProto>() {
-      
-      @Override
-      public Iterator<StringStringMapProto> iterator() {
-        return new Iterator<StringStringMapProto>() {
-          
-          Iterator<String> keyIter = environment.keySet().iterator();
-          
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-          
-          @Override
-          public StringStringMapProto next() {
-            String key = keyIter.next();
-            return StringStringMapProto.newBuilder().setKey(key).setValue((environment.get(key))).build();
-          }
-          
-          @Override
-          public boolean hasNext() {
-            return keyIter.hasNext();
-          }
-        };
-      }
-    };
-    builder.addAllEnvironment(iterable);
-  }
-  @Override
-  public void setEnvironment(String key, String val) {
-    initEnvironment();
-    this.environment.put(key, val);
-  }
-  @Override
-  public void removeEnvironment(String key) {
-    initEnvironment();
-    this.environment.remove(key);
-  }
-  @Override
-  public void clearEnvironment() {
-    initEnvironment();
-    this.environment.clear();
-  }
-  @Override
-  public List<String> getCommandList() {
-    initCommandList();
-    return this.commandList;
-  }
-  @Override
-  public String getCommand(int index) {
-    initCommandList();
-    return this.commandList.get(index);
-  }
-  @Override
-  public int getCommandCount() {
-    initCommandList();
-    return this.commandList.size();
-  }
-  
-  private void initCommandList() {
-    if (this.commandList != null) {
-      return;
-    }
-    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<String> list = p.getCommandList();
-    this.commandList = new ArrayList<String>();
 
-    for (String c : list) {
-      this.commandList.add(c);
-    }
-  }
-  
-  @Override
-  public void addAllCommands(final List<String> command) {
-    if (command == null)
-      return;
-    initCommandList();
-    this.commandList.addAll(command);
-  }
-  
-  private void addCommandsToProto() {
-    maybeInitBuilder();
-    builder.clearCommand();
-    if (this.commandList == null) 
-      return;
-    builder.addAllCommand(this.commandList);
-  }
-  @Override
-  public void addCommand(String command) {
-    initCommandList();
-    this.commandList.add(command);
-  }
-  @Override
-  public void removeCommand(int index) {
-    initCommandList();
-    this.commandList.remove(index);
-  }
-  @Override
-  public void clearCommands() {
-    initCommandList();
-    this.commandList.clear();
-  }
   @Override
   public String getQueue() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -598,6 +165,7 @@ public class ApplicationSubmissionContex
     }
     builder.setQueue((queue));
   }
+  
   @Override
   public String getUser() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -617,6 +185,28 @@ public class ApplicationSubmissionContex
     builder.setUser((user));
   }
 
+  @Override
+  public ContainerLaunchContext getAMContainerSpec() {
+    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.amContainer != null) {
+      return amContainer;
+    } // Else via proto
+    if (!p.hasAmContainerSpec()) {
+      return null;
+    }
+    amContainer = convertFromProtoFormat(p.getAmContainerSpec());
+    return amContainer;
+  }
+
+  @Override
+  public void setAMContainerSpec(ContainerLaunchContext amContainer) {
+    maybeInitBuilder();
+    if (amContainer == null) {
+      builder.clearAmContainerSpec();
+    }
+    this.amContainer = amContainer;
+  }
+
   private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
     return new PriorityPBImpl(p);
   }
@@ -633,28 +223,12 @@ public class ApplicationSubmissionContex
     return ((ApplicationIdPBImpl)t).getProto();
   }
 
-  private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
-    return new ResourcePBImpl(p);
+  private ContainerLaunchContextPBImpl convertFromProtoFormat(
+      ContainerLaunchContextProto p) {
+    return new ContainerLaunchContextPBImpl(p);
   }
 
-  private ResourceProto convertToProtoFormat(Resource t) {
-    return ((ResourcePBImpl)t).getProto();
+  private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) {
+    return ((ContainerLaunchContextPBImpl)t).getProto();
   }
-
-  private URLPBImpl convertFromProtoFormat(URLProto p) {
-    return new URLPBImpl(p);
-  }
-
-  private URLProto convertToProtoFormat(URL t) {
-    return ((URLPBImpl)t).getProto();
-  }
-
-  private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
-    return new LocalResourcePBImpl(p);
-  }
-
-  private LocalResourceProto convertToProtoFormat(LocalResource t) {
-    return ((LocalResourcePBImpl)t).getProto();
-  }
-
 }  

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java Wed Sep 14 07:26:37 2011
@@ -39,8 +39,6 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
 
-
-    
 public class ContainerLaunchContextPBImpl 
 extends ProtoBase<ContainerLaunchContextProto> 
 implements ContainerLaunchContext {
@@ -54,10 +52,9 @@ implements ContainerLaunchContext {
   private Map<String, LocalResource> localResources = null;
   private ByteBuffer containerTokens = null;
   private Map<String, ByteBuffer> serviceData = null;
-  private Map<String, String> env = null;
+  private Map<String, String> environment = null;
   private List<String> commands = null;
   
-  
   public ContainerLaunchContextPBImpl() {
     builder = ContainerLaunchContextProto.newBuilder();
   }
@@ -94,7 +91,7 @@ implements ContainerLaunchContext {
     if (this.serviceData != null) {
       addServiceDataToProto();
     }
-    if (this.env != null) {
+    if (this.environment != null) {
       addEnvToProto();
     }
     if (this.commands != null) {
@@ -364,37 +361,37 @@ implements ContainerLaunchContext {
   }
   
   @Override
-  public Map<String, String> getEnv() {
+  public Map<String, String> getEnvironment() {
     initEnv();
-    return this.env;
+    return this.environment;
   }
   
   private void initEnv() {
-    if (this.env != null) {
+    if (this.environment != null) {
       return;
     }
     ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
-    List<StringStringMapProto> list = p.getEnvList();
-    this.env = new HashMap<String, String>();
+    List<StringStringMapProto> list = p.getEnvironmentList();
+    this.environment = new HashMap<String, String>();
 
     for (StringStringMapProto c : list) {
-      this.env.put(c.getKey(), c.getValue());
+      this.environment.put(c.getKey(), c.getValue());
     }
   }
   
   @Override
-  public void setEnv(final Map<String, String> env) {
+  public void setEnvironment(final Map<String, String> env) {
     if (env == null)
       return;
     initEnv();
-    this.env.clear();
-    this.env.putAll(env);
+    this.environment.clear();
+    this.environment.putAll(env);
   }
   
   private void addEnvToProto() {
     maybeInitBuilder();
-    builder.clearEnv();
-    if (env == null)
+    builder.clearEnvironment();
+    if (environment == null)
       return;
     Iterable<StringStringMapProto> iterable = 
         new Iterable<StringStringMapProto>() {
@@ -403,7 +400,7 @@ implements ContainerLaunchContext {
       public Iterator<StringStringMapProto> iterator() {
         return new Iterator<StringStringMapProto>() {
           
-          Iterator<String> keyIter = env.keySet().iterator();
+          Iterator<String> keyIter = environment.keySet().iterator();
           
           @Override
           public void remove() {
@@ -414,7 +411,7 @@ implements ContainerLaunchContext {
           public StringStringMapProto next() {
             String key = keyIter.next();
             return StringStringMapProto.newBuilder().setKey(key).setValue(
-                (env.get(key))).build();
+                (environment.get(key))).build();
           }
           
           @Override
@@ -424,7 +421,7 @@ implements ContainerLaunchContext {
         };
       }
     };
-    builder.addAllEnv(iterable);
+    builder.addAllEnvironment(iterable);
   }
 
   private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Wed Sep 14 07:26:37 2011
@@ -188,17 +188,11 @@ message AMResponseProto {
 ////////////////////////////////////////////////////////////////////////
 message ApplicationSubmissionContextProto {
   optional ApplicationIdProto application_id = 1;
-  optional string application_name = 2;
-  optional ResourceProto master_capability = 3;
-  repeated StringURLMapProto resources = 4;
-  repeated StringLocalResourceMapProto resources_todo = 5;
-  repeated string fs_tokens = 6;
-  optional bytes fs_tokens_todo = 7;
-  repeated StringStringMapProto environment = 8;
-  repeated string command = 9;
-  optional string queue = 10;
-  optional PriorityProto priority = 11;
-  optional string user = 12;
+  optional string application_name = 2 [default = "N/A"];
+  optional string user = 3; 
+  optional string queue = 4 [default = "default"];
+  optional PriorityProto priority = 5;
+  optional ContainerLaunchContextProto am_container_spec = 6;
 }
 
 message YarnClusterMetricsProto {
@@ -242,7 +236,7 @@ message ContainerLaunchContextProto {
   repeated StringLocalResourceMapProto localResources = 4;
   optional bytes container_tokens = 5;
   repeated StringBytesMapProto service_data = 6;
-  repeated StringStringMapProto env = 7;
+  repeated StringStringMapProto environment = 7;
   repeated string command = 8;
 }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Sep 14 07:26:37 2011
@@ -219,6 +219,12 @@ public class YarnConfiguration extends C
     RM_PREFIX + "max-completed-applications";
   public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
   
+  /** Default application name */
+  public static final String DEFAULT_APPLICATION_NAME = "N/A";
+
+  /** Default queue name */
+  public static final String DEFAULT_QUEUE_NAME = "default";
+  
   ////////////////////////////////
   // Node Manager Configs
   ////////////////////////////////

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Sep 14 07:26:37 2011
@@ -89,7 +89,7 @@ public class ContainerLaunch implements 
     final Map<Path,String> localResources = container.getLocalizedResources();
     String containerIdStr = ConverterUtils.toString(container.getContainerID());
     final String user = launchContext.getUser();
-    final Map<String,String> env = launchContext.getEnv();
+    final Map<String,String> env = launchContext.getEnvironment();
     final List<String> command = launchContext.getCommands();
     int ret = -1;
 
@@ -109,7 +109,7 @@ public class ContainerLaunch implements 
       }
       launchContext.setCommands(newCmds);
 
-      Map<String, String> envs = launchContext.getEnv();
+      Map<String, String> envs = launchContext.getEnvironment();
       Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
       for (Entry<String, String> entry : envs.entrySet()) {
         newEnvs.put(
@@ -118,7 +118,7 @@ public class ContainerLaunch implements 
                 ApplicationConstants.LOG_DIR_EXPANSION_VAR,
                 containerLogDir.toUri().getPath()));
       }
-      launchContext.setEnv(newEnvs);
+      launchContext.setEnvironment(newEnvs);
       // /////////////////////////// End of variable expansion
 
       FileContext lfs = FileContext.getLocalFSFileContext();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Sep 14 07:26:37 2011
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -90,7 +89,6 @@ public class ClientRMService extends Abs
   final private AtomicInteger applicationCounter = new AtomicInteger(0);
   final private YarnScheduler scheduler;
   final private RMContext rmContext;
-  private final AMLivelinessMonitor amLivelinessMonitor;
   private final RMAppManager rmAppManager;
 
   private String clientServiceBindAddress;
@@ -106,7 +104,6 @@ public class ClientRMService extends Abs
     super(ClientRMService.class.getName());
     this.scheduler = scheduler;
     this.rmContext = rmContext;
-    this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rmAppManager = rmAppManager;
   }
   
@@ -195,15 +192,18 @@ public class ClientRMService extends Abs
       SubmitApplicationRequest request) throws YarnRemoteException {
     ApplicationSubmissionContext submissionContext = request
         .getApplicationSubmissionContext();
-    ApplicationId applicationId = null;
-    String user = null;
+    ApplicationId applicationId = submissionContext.getApplicationId();
+    String user = submissionContext.getUser();
     try {
       user = UserGroupInformation.getCurrentUser().getShortUserName();
-      applicationId = submissionContext.getApplicationId();
       if (rmContext.getRMApps().get(applicationId) != null) {
         throw new IOException("Application with id " + applicationId
             + " is already present! Cannot add a duplicate!");
       }
+      
+      // Safety 
+      submissionContext.setUser(user);
+      
       // This needs to be synchronous as the client can query 
       // immediately following the submission to get the application status.
       // So call handle directly and do not send an event.
@@ -226,6 +226,7 @@ public class ClientRMService extends Abs
     return response;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public FinishApplicationResponse finishApplication(
       FinishApplicationRequest request) throws YarnRemoteException {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Sep 14 07:26:37 2011
@@ -210,7 +210,9 @@ public class RMAppManager implements Eve
     }
   }
 
-  protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) {
+  @SuppressWarnings("unchecked")
+  protected synchronized void submitApplication(
+      ApplicationSubmissionContext submissionContext) {
     ApplicationId applicationId = submissionContext.getApplicationId();
     RMApp application = null;
     try {
@@ -224,27 +226,37 @@ public class RMAppManager implements Eve
         clientTokenStr = clientToken.encodeToUrlString();
         LOG.debug("Sending client token as " + clientTokenStr);
       }
-      submissionContext.setQueue(submissionContext.getQueue() == null
-          ? "default" : submissionContext.getQueue());
-      submissionContext.setApplicationName(submissionContext
-          .getApplicationName() == null ? "N/A" : submissionContext
-          .getApplicationName());
+      
+      // Sanity checks
+      if (submissionContext.getQueue() == null) {
+        submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+      }
+      if (submissionContext.getApplicationName() == null) {
+        submissionContext.setApplicationName(
+            YarnConfiguration.DEFAULT_APPLICATION_NAME);
+      }
+
+      // Store application for recovery
       ApplicationStore appStore = rmContext.getApplicationsStore()
           .createApplicationStore(submissionContext.getApplicationId(),
           submissionContext);
+      
+      // Create RMApp
       application = new RMAppImpl(applicationId, rmContext,
           this.conf, submissionContext.getApplicationName(), user,
           submissionContext.getQueue(), submissionContext, clientTokenStr,
-          appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
+          appStore, this.scheduler,
           this.masterService);
 
-      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
+          null) {
         LOG.info("Application with id " + applicationId + 
             " is already present! Cannot add a duplicate!");
-        // don't send event through dispatcher as it will be handled by app already
-        // present with this id.
+        // don't send event through dispatcher as it will be handled by app 
+        // already present with this id.
         application.handle(new RMAppRejectedEvent(applicationId,
-            "Application with this id is already present! Cannot add a duplicate!"));
+            "Application with this id is already present! " +
+            "Cannot add a duplicate!"));
       } else {
         this.rmContext.getDispatcher().getEventHandler().handle(
             new RMAppEvent(applicationId, RMAppEventType.START));

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java Wed Sep 14 07:26:37 2011
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 
 public class RMAppManagerSubmitEvent extends RMAppManagerEvent {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Sep 14 07:26:37 2011
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -120,7 +119,8 @@ public class AMLauncher implements Runna
         + " for AM " + application.getAppAttemptId());  
     ContainerLaunchContext launchContext =
         createAMContainerLaunchContext(applicationContext, masterContainerID);
-    StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
+    StartContainerRequest request = 
+        recordFactory.newRecordInstance(StartContainerRequest.class);
     request.setContainerLaunchContext(launchContext);
     containerMgrProxy.startContainer(request);
     LOG.info("Done launching container " + application.getMasterContainer() 
@@ -130,7 +130,8 @@ public class AMLauncher implements Runna
   private void cleanup() throws IOException {
     connect();
     ContainerId containerId = application.getMasterContainer().getId();
-    StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+    StopContainerRequest stopRequest = 
+        recordFactory.newRecordInstance(StopContainerRequest.class);
     stopRequest.setContainerId(containerId);
     containerMgrProxy.stopContainer(stopRequest);
   }
@@ -145,7 +146,7 @@ public class AMLauncher implements Runna
     final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
 
     UserGroupInformation currentUser =
-        UserGroupInformation.createRemoteUser("TODO"); // TODO
+        UserGroupInformation.createRemoteUser("yarn"); // TODO
     if (UserGroupInformation.isSecurityEnabled()) {
       ContainerToken containerToken = container.getContainerToken();
       Token<ContainerTokenIdentifier> token =
@@ -170,8 +171,8 @@ public class AMLauncher implements Runna
       ContainerId containerID) throws IOException {
 
     // Construct the actual Container
-    ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
-    container.setCommands(applicationMasterContext.getCommandList());
+    ContainerLaunchContext container = 
+        applicationMasterContext.getAMContainerSpec();
     StringBuilder mergedCommand = new StringBuilder();
     String failCount = Integer.toString(application.getAppAttemptId()
         .getAttemptId());
@@ -189,34 +190,28 @@ public class AMLauncher implements Runna
    
     LOG.info("Command to launch container " + 
         containerID + " : " + mergedCommand);
-    Map<String, String> environment = 
-        applicationMasterContext.getAllEnvironment();
-    environment.putAll(setupTokensInEnv(applicationMasterContext));
-    container.setEnv(environment);
-
-    // Construct the actual Container
+    
+    // Finalize the container
     container.setContainerId(containerID);
     container.setUser(applicationMasterContext.getUser());
-    container.setResource(applicationMasterContext.getMasterCapability());
-    container.setLocalResources(applicationMasterContext.getAllResourcesTodo());
-    container.setContainerTokens(applicationMasterContext.getFsTokensTodo());
+    setupTokensAndEnv(container);
+    
     return container;
   }
 
-  private Map<String, String> setupTokensInEnv(
-      ApplicationSubmissionContext asc)
+  private void setupTokensAndEnv(
+      ContainerLaunchContext container)
       throws IOException {
-    Map<String, String> env =
-      new HashMap<String, String>();
+    Map<String, String> environment = container.getEnvironment();
     if (UserGroupInformation.isSecurityEnabled()) {
       // TODO: Security enabled/disabled info should come from RM.
 
       Credentials credentials = new Credentials();
 
       DataInputByteBuffer dibb = new DataInputByteBuffer();
-      if (asc.getFsTokensTodo() != null) {
+      if (container.getContainerTokens() != null) {
         // TODO: Don't do this kind of checks everywhere.
-        dibb.reset(asc.getFsTokensTodo());
+        dibb.reset(container.getContainerTokens());
         credentials.readTokenStorageStream(dibb);
       }
 
@@ -236,14 +231,16 @@ public class AMLauncher implements Runna
       token.setService(new Text(resolvedAddr));
       String appMasterTokenEncoded = token.encodeToUrlString();
       LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
-      env.put(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
+      environment.put(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
           appMasterTokenEncoded);
 
       // Add the RM token
       credentials.addToken(new Text(resolvedAddr), token);
       DataOutputBuffer dob = new DataOutputBuffer();
       credentials.writeTokenStorageToStream(dob);
-      asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+      container.setContainerTokens(
+          ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
 
       ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
           application.getAppAttemptId().getApplicationId());
@@ -252,9 +249,10 @@ public class AMLauncher implements Runna
       String encoded =
           Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
       LOG.debug("The encoded client secret-key to be put in env : " + encoded);
-      env.put(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded);
+      environment.put(
+          ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, 
+          encoded);
     }
-    return env;
   }
   
   @SuppressWarnings("unchecked")

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Sep 14 07:26:37 2011
@@ -86,7 +86,6 @@ public class RMAppImpl implements RMApp 
   // Mutable fields
   private long startTime;
   private long finishTime;
-  private AMLivelinessMonitor amLivelinessMonitor;
   private RMAppAttempt currentAttempt;
 
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
@@ -163,7 +162,7 @@ public class RMAppImpl implements RMApp 
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, String clientTokenStr,
-      ApplicationStore appStore, AMLivelinessMonitor amLivelinessMonitor,
+      ApplicationStore appStore, 
       YarnScheduler scheduler, ApplicationMasterService masterService) {
 
     this.applicationId = applicationId;
@@ -176,7 +175,6 @@ public class RMAppImpl implements RMApp 
     this.submissionContext = submissionContext;
     this.clientTokenStr = clientTokenStr;
     this.appStore = appStore;
-    this.amLivelinessMonitor = amLivelinessMonitor;
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.startTime = System.currentTimeMillis();
@@ -380,6 +378,7 @@ public class RMAppImpl implements RMApp 
     }
   }
 
+  @SuppressWarnings("unchecked")
   private void createNewAttempt() {
     ApplicationAttemptId appAttemptId = Records
         .newRecord(ApplicationAttemptId.class);
@@ -434,6 +433,7 @@ public class RMAppImpl implements RMApp 
       return nodes;
     }
 
+    @SuppressWarnings("unchecked")
     public void transition(RMAppImpl app, RMAppEvent event) {
       Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
       for (NodeId nodeId : nodes) {