You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/04/08 04:54:57 UTC

svn commit: r1090092 [1/2] - in /hadoop/mapreduce/branches/MR-279: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-ma...

Author: mahadev
Date: Fri Apr  8 02:54:56 2011
New Revision: 1090092

URL: http://svn.apache.org/viewvc?rev=1090092&view=rev
Log:
MAPREDUCE-2405: Implement uber-AppMaster (in-cluster LocalJobRunner for MRv2) Greg Roelofs via mahadev

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
Modified:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1090092&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Fri Apr  8 02:54:56 2011
@@ -0,0 +1,432 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMasterConstants;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * Runs the container task locally in a thread.
+ * Since all (sub)tasks share the same local directory, they must be executed
+ * sequentially in order to avoid creating/deleting the same files/dirs.
+ */
+public class LocalContainerLauncher extends AbstractService implements
+    ContainerLauncher {
+
+  private static final File curDir = new File(new String("."));
+  private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class);
+
+  private FileContext curFC = null;
+  private final HashSet<File> localizedFiles;
+  private final AppContext context;
+  private final TaskUmbilicalProtocol umbilical;
+  private Thread eventHandlingThread;
+  private BlockingQueue<ContainerLauncherEvent> eventQueue =
+      new LinkedBlockingQueue<ContainerLauncherEvent>();
+
+  public LocalContainerLauncher(AppContext context,
+                                TaskUmbilicalProtocol umbilical) {
+    super(LocalContainerLauncher.class.getName());
+    this.context = context;
+    this.umbilical = umbilical;
+        // umbilical:  MRAppMaster creates (taskAttemptListener), passes to us  (TODO/FIXME:  pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar:  implement umbilical protocol but skip RPC stuff)
+
+    try {
+      curFC = FileContext.getFileContext(curDir.toURI());
+    } catch (UnsupportedFileSystemException ufse) {
+      LOG.error("Local filesystem " + curDir.toURI().toString()
+                + " is unsupported?? (should never happen)");
+    }
+
+    // Save list of files/dirs that are supposed to be present so can delete
+    // any extras created by one task before starting subsequent task.  Note
+    // that there's no protection against deleted or renamed localization;
+    // users who do that get what they deserve (and will have to disable
+    // uberization in order to run correctly).
+    File[] curLocalFiles = curDir.listFiles();
+    localizedFiles = new HashSet<File>(curLocalFiles.length);
+    for (int j = 0; j < curLocalFiles.length; ++j) {
+      localizedFiles.add(curLocalFiles[j]);
+    }
+
+    // Relocalization note/future FIXME (per chrisdo, 20110315):  At moment,
+    // full localization info is in AppSubmissionContext passed from client to
+    // RM and then to NM for AM-container launch:  no difference between AM-
+    // localization and MapTask- or ReduceTask-localization, so can assume all
+    // OK.  Longer-term, will need to override uber-AM container-localization
+    // request ("needed resources") with union of regular-AM-resources + task-
+    // resources (and, if maps and reduces ever differ, then union of all three
+    // types), OR will need localizer service/API that uber-AM can request
+    // after running (e.g., "localizeForTask()" or "localizeForMapTask()").
+  }
+
+  public void start() {
+    eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner");
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  public void stop() {
+    eventHandlingThread.interrupt();
+    super.stop();
+  }
+
+  @Override
+  public void handle(ContainerLauncherEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);  // FIXME? YarnException is "for runtime exceptions only"
+    }
+  }
+
+
+  /*
+   * Uber-AM lifecycle/ordering ("normal" case):
+   *
+   * - [somebody] sends TA_ASSIGNED
+   *   - handled by ContainerAssignedTransition (TaskAttemptImpl.java)
+   *     - creates "remoteTask" for us == real Task
+   *     - sends CONTAINER_REMOTE_LAUNCH
+   *     - TA: UNASSIGNED -> ASSIGNED
+   * - CONTAINER_REMOTE_LAUNCH handled by LocalContainerLauncher (us!)
+   *   - sucks "remoteTask" out of TaskAttemptImpl via getRemoteTask()
+   *   - sends TA_CONTAINER_LAUNCHED
+   *     [[ elsewhere...
+   *       - TA_CONTAINER_LAUNCHED handled by LaunchedContainerTransition
+   *         - registers "remoteTask" with TaskAttemptListener (== umbilical)
+   *         - NUKES "remoteTask"
+   *         - sends T_ATTEMPT_LAUNCHED (Task: SCHEDULED -> RUNNING)
+   *         - TA: ASSIGNED -> RUNNING
+   *     ]]
+   *   - runs Task (runSubMap() or runSubReduce())
+   *     - TA can safely send TA_UPDATE since in RUNNING state
+   *       [modulo possible TA-state-machine race noted below:  CHECK (TODO)]
+   */
+  private class SubtaskRunner implements Runnable {
+
+    private boolean doneWithMaps = false;
+    private int finishedSubMaps = 0;
+
+    SubtaskRunner() {
+    }
+
+    @Override
+    public void run() {
+      ContainerLauncherEvent event = null;
+
+      // _must_ either run subtasks sequentially or accept expense of new JVMs
+      // (i.e., fork()), else will get weird failures when maps try to create/
+      // write same dirname or filename:  no chdir() in Java
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          event = eventQueue.take();
+        } catch (InterruptedException e) {  // mostly via T_KILL? JOB_KILL?
+          LOG.error("Returning, interrupted : " + e);
+          return;
+        }
+
+        LOG.info("Processing the event " + event.toString());
+
+        if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+
+          ContainerRemoteLaunchEvent launchEv =
+              (ContainerRemoteLaunchEvent)event;
+          TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME:  can attemptID ever be null?  (only if retrieved over umbilical?)
+
+          Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
+          int numMapTasks = job.getTotalMaps();
+          int numReduceTasks = job.getTotalReduces();
+
+          // YARN (tracking) Task:
+          org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
+              job.getTask(attemptID.getTaskId());
+          // classic mapred Task:
+          org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+          // after "launching," send launched event to task attempt to move
+          // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
+          // do getRemoteTask() call first)
+          context.getEventHandler().handle(
+              new TaskAttemptEvent(attemptID,
+                  TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); //FIXME:  race condition here?  or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state?  (probably latter)
+
+          if (numMapTasks == 0) {
+            doneWithMaps = true;
+          }
+
+          try {
+            runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+                       (numReduceTasks > 0));
+          } catch (RuntimeException re) {
+            // this is our signal that the subtask failed in some way, so
+            // simulate a failed JVM/container and send a container-completed
+            // event to task attempt (i.e., move state machine from RUNNING
+            // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
+            context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+          } catch (IOException ioe) {
+            // if umbilical itself barfs (in error-handler of runSubMap()),
+            // we're pretty much hosed, so do what YarnChild main() does
+            // (i.e., exit clumsily--but can never happen, so no worries!)
+            LOG.fatal("oopsie...  this can never happen: "
+                + StringUtils.stringifyException(ioe));
+            System.exit(-1);
+          }
+
+        } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
+
+          // no container to kill, so just send "cleaned" event to task attempt
+          // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state
+          // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
+          context.getEventHandler().handle(
+              new TaskAttemptEvent(event.getTaskAttemptID(),
+                  TaskAttemptEventType.TA_CONTAINER_CLEANED));
+
+        } else {
+          LOG.warn("Ignoring unexpected event " + event.toString());
+        }
+
+      }
+    }
+
+    private void runSubtask(org.apache.hadoop.mapred.Task task,
+                            final TaskType taskType,
+                            TaskAttemptId attemptID,
+                            final int numMapTasks,
+                            boolean renameOutputs)
+    throws RuntimeException, IOException {
+      org.apache.hadoop.mapred.TaskAttemptID classicAttemptID =
+          TypeConverter.fromYarn(attemptID);
+
+      try {
+        JobConf conf = new JobConf(getConfig());
+
+        // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
+        // etc.), or just assume/hope the state machine(s) and uber-AM work
+        // as expected?
+        if (taskType == TaskType.MAP) {
+          if (doneWithMaps) {
+            LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+                      + attemptID + "), but should be finished with maps");
+            // throw new RuntimeException()  (FIXME: what's appropriate here?)
+          }
+
+          MapTask map = (MapTask)task;
+
+          //CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
+          map.run(conf, umbilical);
+
+          if (renameOutputs) {
+            renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
+          }
+          relocalize();
+
+          if (++finishedSubMaps == numMapTasks) {
+            doneWithMaps = true;
+          }
+
+        } else /* TaskType.REDUCE */ {
+
+          if (!doneWithMaps) {
+            //check if event-queue empty?  whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):  doesn't send reduce event until maps all done]
+            LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+                      + attemptID + "), but not yet finished with maps");
+            // throw new RuntimeException()  (FIXME) // or push reduce event back onto end of queue? (probably former)
+          }
+
+          ReduceTask reduce = (ReduceTask)task;
+
+          // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
+          conf.set(MRConfig.MASTER_ADDRESS, "local");  // bypass shuffle
+
+          reduce.run(conf, umbilical);
+          //relocalize();  // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
+        }
+
+      } catch (FSError e) {
+        LOG.fatal("FSError from child", e);
+        // umbilical:  MRAppMaster creates (taskAttemptListener), passes to us
+        umbilical.fsError(classicAttemptID, e.getMessage());
+        throw new RuntimeException();
+
+      } catch (Exception exception) {
+        LOG.warn("Exception running local (uberized) 'child' : "
+            + StringUtils.stringifyException(exception));
+        try {
+          if (task != null) {
+            // do cleanup for the task
+//          if (childUGI == null) { // no need to job into doAs block
+              task.taskCleanup(umbilical);
+//          } else {
+//            final Task taskFinal = task;
+//            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+//              @Override
+//              public Object run() throws Exception {
+//                taskFinal.taskCleanup(umbilical);
+//                return null;
+//              }
+//            });
+//          }
+          }
+        } catch (Exception e) {
+          LOG.info("Exception cleaning up: "
+              + StringUtils.stringifyException(e));
+        }
+        // Report back any failures, for diagnostic purposes
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        exception.printStackTrace(new PrintStream(baos));
+//      if (classicAttemptID != null) {
+          umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
+//      }
+        throw new RuntimeException();
+
+      } catch (Throwable throwable) {
+        LOG.fatal("Error running local (uberized) 'child' : "
+            + StringUtils.stringifyException(throwable));
+//      if (classicAttemptID != null) {
+          Throwable tCause = throwable.getCause();
+          String cause = (tCause == null)
+              ? throwable.getMessage()
+              : StringUtils.stringifyException(tCause);
+          umbilical.fatalError(classicAttemptID, cause);
+//      }
+        throw new RuntimeException();
+
+      } finally {
+/*
+FIXME:  do we need to do any of this stuff?  (guessing not since not in own JVM)
+        RPC.stopProxy(umbilical);
+        DefaultMetricsSystem.shutdown();
+        // Shutting down log4j of the child-vm...
+        // This assumes that on return from Task.run()
+        // there is no more logging done.
+        LogManager.shutdown();
+ */
+      }
+    }
+
+
+/* FIXME:  may not need renameMapOutputForReduce() anymore?  TEST!
+
+${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for tasks;
+contains task.sh script, which, when executed, creates symlinks and sets up env
+ "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
+ "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
+ "$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done
+
+	OHO!  no further need for this at all?  $taskId is unique per subtask
+	now => should work fine to leave alone.  TODO:  test with teragen or
+	similar
+ */
+
+    /**
+     * Within the _local_ filesystem (not HDFS), all activity takes place within
+     * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+     * and all sub-MapTasks create the same filename ("file.out").  Rename that
+     * to something unique (e.g., "map_0.out") to avoid collisions.
+     *
+     * Longer-term, we'll modify [something] to use TaskAttemptID-based
+     * filenames instead of "file.out". (All of this is entirely internal,
+     * so there are no particular compatibility issues.)
+     */
+    private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
+                                          MapOutputFile subMapOutputFile)
+    throws IOException {
+      FileSystem localFs = FileSystem.getLocal(conf);
+      // move map output to reduce input
+      Path mapOut = subMapOutputFile.getOutputFile();
+      Path reduceIn = subMapOutputFile.getInputFileForWrite(
+          TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut));
+      if (!localFs.mkdirs(reduceIn.getParent())) {
+        throw new IOException("Mkdirs failed to create "
+            + reduceIn.getParent().toString());
+      }
+      if (!localFs.rename(mapOut, reduceIn))
+        throw new IOException("Couldn't rename " + mapOut);
+    }
+
+    /**
+     * Also within the local filesystem, we need to restore the initial state
+     * of the directory as much as possible.  Compare current contents against
+     * the saved original state and nuke everything that doesn't belong, with
+     * the exception of the renamed map outputs (see above).
+FIXME:  do we really need to worry about renamed map outputs, or already moved to output dir on commit?  if latter, fix comment
+     *
+     * Any jobs that go out of their way to rename or delete things from the
+     * local directory are considered broken and deserve what they get...
+     */
+    private void relocalize() {
+      File[] curLocalFiles = curDir.listFiles();
+      for (int j = 0; j < curLocalFiles.length; ++j) {
+        if (!localizedFiles.contains(curLocalFiles[j])) {
+          // found one that wasn't there before:  delete it
+          boolean deleted = false;
+          try {
+            if (curFC != null) {
+              // this is recursive, unlike File delete():
+              deleted = curFC.delete(new Path(curLocalFiles[j].getName()),true);
+            }
+          } catch (Exception e) {
+            deleted = false;
+          }
+          if (!deleted) {
+            LOG.warn("Unable to delete unexpected local file/dir "
+                + curLocalFiles[j].getName() + ": insufficient permissions?");
+          }
+        }
+      }
+    }
+
+  } // end SubtaskRunner
+
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Apr  8 02:54:56 2011
@@ -31,7 +31,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
@@ -53,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
@@ -111,6 +114,8 @@ public class MRAppMaster extends Composi
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
 
+  private Job job;
+
   public MRAppMaster(ApplicationId applicationId) {
     this(applicationId, null);
   }
@@ -139,26 +144,19 @@ public class MRAppMaster extends Composi
     //service to do the task cleanup
     taskCleaner = createTaskCleaner(context);
     addIfService(taskCleaner);
-    //service to launch allocated containers via NodeManager
-    containerLauncher = createContainerLauncher(context);
-    addIfService(containerLauncher);
-    
+
     //service to handle requests from JobClient
     clientService = createClientService(context);
     addIfService(clientService);
 
-    //service to allocate containers from RM
-    containerAllocator = createContainerAllocator(clientService, context);
-    addIfService(containerAllocator);
-
     //service to log job history events
     EventHandler<JobHistoryEvent> historyService = 
-      createJobHistoryHandler(conf);
+        createJobHistoryHandler(conf);
+
+    JobEventDispatcher synchronousJobEventDispatcher = new JobEventDispatcher();
 
     //register the event dispatchers
-    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
-    dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
-    dispatcher.register(JobEventType.class, new JobEventDispatcher());
+    dispatcher.register(JobEventType.class, synchronousJobEventDispatcher);
     dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
     dispatcher.register(TaskAttemptEventType.class, 
         new TaskAttemptEventDispatcher());
@@ -178,7 +176,118 @@ public class MRAppMaster extends Composi
     }
 
     super.init(conf);
-  }
+
+    //---- start of what used to be startJobs() code:
+
+    Configuration config = getConfig();
+
+    job = createJob(config);
+
+    /** create a job event for job intialization */
+    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+    /** send init to the job (this does NOT trigger job execution) */
+    synchronousJobEventDispatcher.handle(initJobEvent);
+
+    // JobImpl's InitTransition is done (call above is synchronous), so the
+    // "uber-decision" (MR-1220) has been made.  Query job and switch to
+    // ubermode if appropriate (by registering different container-allocator
+    // and container-launcher services/event-handlers).
+
+    if (job.isUber()) {
+      LOG.info("MRAppMaster uberizing job " + job.getID()
+               + " in local container (\"uber-AM\").");
+    } else {
+      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+               + "job " + job.getID() + ".");
+    }
+
+    // service to allocate containers from RM (if non-uber) or to fake it (uber)
+    containerAllocator =
+        createContainerAllocator(clientService, context, job.isUber());
+    addIfService(containerAllocator);
+    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+    if (containerAllocator instanceof Service) {
+      ((Service) containerAllocator).init(config);
+    }
+
+    // corresponding service to launch allocated containers via NodeManager
+    containerLauncher = createContainerLauncher(context, job.isUber());
+    addIfService(containerLauncher);
+    dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+    if (containerLauncher instanceof Service) {
+      ((Service) containerLauncher).init(config);
+    }
+
+  } // end of init()
+
+  /** Create and initialize (but don't start) a single job. */
+  public Job createJob(Configuration conf) {
+    Credentials fsTokens = new Credentials();
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      // Read the file-system tokens from the localized tokens-file.
+      try {
+        Path jobSubmitDir =
+            FileContext.getLocalFSFileContext().makeQualified(
+                new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR)
+                    .getAbsolutePath()));
+        Path jobTokenFile =
+            new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
+        fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
+        LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+            + jobTokenFile);
+
+        UserGroupInformation currentUser =
+            UserGroupInformation.getCurrentUser();
+        for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
+          LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
+              + "in current ugi in the AppMaster for service "
+              + tk.getService());
+          currentUser.addToken(tk); // For use by AppMaster itself.
+        }
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+    }
+
+    // create single job
+    Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
+                      taskAttemptListener, jobTokenSecretManager, fsTokens);
+    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
+
+    dispatcher.register(JobFinishEvent.Type.class,
+        new EventHandler<JobFinishEvent>() {
+          @Override
+          public void handle(JobFinishEvent event) {
+            // job has finished
+            // this is the only job, so shut down the Appmaster
+            // note in a workflow scenario, this may lead to creation of a new
+            // job (FIXME?)
+
+            // TODO:currently just wait for some time so clients can know the
+            // final states. Will be removed once RM come on.
+            try {
+              Thread.sleep(15000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            LOG.info("Calling stop for all the services");
+            try {
+              stop();
+            } catch (Throwable t) {
+              LOG.warn("Graceful stop failed ", t);
+            }
+            //TODO: this is required because rpc server does not shut down
+            // in spite of calling server.stop().
+            //Bring the process down by force.
+            //Not needed after HADOOP-7140
+            LOG.info("Exiting MR AppMaster..GoodBye!");
+            System.exit(0);
+          }
+        });
+
+    return newJob;
+  } // end createJob()
 
   static class NullSpeculatorEventHandler
       implements EventHandler<SpeculatorEvent> {
@@ -245,14 +354,20 @@ public class MRAppMaster extends Composi
     return new TaskCleanerImpl(context);
   }
 
-  protected ContainerLauncher createContainerLauncher(AppContext context) {
-    return new ContainerLauncherImpl(context);
-  }
-  
-  protected ContainerAllocator createContainerAllocator(ClientService
-      clientService, AppContext context) {
+  protected ContainerAllocator createContainerAllocator(
+      ClientService clientService, AppContext context, boolean isLocal) {
     //return new StaticContainerAllocator(context);
-    return new RMContainerAllocator(clientService, context);
+    return isLocal
+        ? new LocalContainerAllocator(clientService, context)
+        : new RMContainerAllocator(clientService, context);
+  }
+
+  protected ContainerLauncher createContainerLauncher(AppContext context,
+                                                      boolean isLocal) {
+    return isLocal
+        ? new LocalContainerLauncher(context,
+            (TaskUmbilicalProtocol) taskAttemptListener)
+        : new ContainerLauncherImpl(context);
   }
 
   //TODO:should have an interface for MRClientService
@@ -292,7 +407,7 @@ public class MRAppMaster extends Composi
   class RunningAppContext implements AppContext {
 
     private Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
-   
+
     @Override
     public ApplicationId getApplicationID() {
       return appID;
@@ -330,80 +445,17 @@ public class MRAppMaster extends Composi
   /**
    * This can be overridden to instantiate multiple jobs and create a 
    * workflow.
+   *
+   * TODO:  Rework the design to actually support this.  Currently much of the
+   * job stuff has been moved to init() above to support uberization (MR-1220).
+   * In a typical workflow, one presumably would want to uberize only a subset
+   * of the jobs (the "small" ones), which is awkward with the current design.
    */
   protected void startJobs() {
-
-    Configuration config = getConfig();
-
-    Credentials fsTokens = new Credentials();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      // Read the file-system tokens from the localized tokens-file.
-      try {
-        Path jobSubmitDir =
-            FileContext.getLocalFSFileContext().makeQualified(
-                new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR)
-                    .getAbsolutePath()));
-        Path jobTokenFile =
-            new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
-        fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, config));
-        LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
-            + jobTokenFile);
-
-        UserGroupInformation currentUser =
-            UserGroupInformation.getCurrentUser();
-        for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
-          LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
-              + "in current ugi in the AppMaster for service "
-              + tk.getService());
-          currentUser.addToken(tk); // For use by AppMaster itself.
-        }
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-    }
-
-    //create single job
-    Job job =
-        new JobImpl(appID, config, dispatcher.getEventHandler(),
-            taskAttemptListener, jobTokenSecretManager, fsTokens);
-    ((RunningAppContext) context).jobs.put(job.getID(), job);
-
-    dispatcher.register(JobFinishEvent.Type.class,
-        new EventHandler<JobFinishEvent>() {
-          @Override
-          public void handle(JobFinishEvent event) {
-            // job has finished
-            // this is the only job, so shutdown the Appmaster
-            // note in a workflow scenario, this may lead to creation of a new
-            // job
-
-            // TODO:currently just wait for sometime so clients can know the
-            // final states. Will be removed once RM come on.
-            try {
-              Thread.sleep(15000);
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-            LOG.info("Calling stop for all the services");
-            try {
-              stop();
-            } catch (Throwable t) {
-              LOG.warn("Graceful stop failed ", t);
-            }
-            //TODO: this is required because rpc server does not shutdown
-            //inspite of calling server.stop().
-            //Bring the process down by force.
-            //Not needed after HADOOP-7140
-            LOG.info("Exiting MR AppMaster..GoodBye!");
-            System.exit(0);
-          }
-        });
-
-    /** create a job event for job intialization **/
-    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
-    /** send init on the job. this triggers the job execution.**/
-    dispatcher.getEventHandler().handle(initJobEvent);
+    /** create a job-start event to get this ball rolling */
+    JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
+    /** send the job-start event. this triggers the job execution. */
+    dispatcher.getEventHandler().handle(startJobEvent);
   }
 
   private class JobEventDispatcher implements EventHandler<JobEvent> {
@@ -411,7 +463,8 @@ public class MRAppMaster extends Composi
     public void handle(JobEvent event) {
       ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
     }
-  }  
+  }
+
   private class TaskEventDispatcher implements EventHandler<TaskEvent> {
     @Override
     public void handle(TaskEvent event) {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Fri Apr  8 02:54:56 2011
@@ -48,6 +48,7 @@ public interface Job {
   int getTotalReduces();
   int getCompletedMaps();
   int getCompletedReduces();
+  boolean isUber();
 
   TaskAttemptCompletionEvent[]
       getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Fri Apr  8 02:54:56 2011
@@ -28,11 +28,12 @@ public enum JobEventType {
 
   //Producer:MRAppMaster
   JOB_INIT,
+  JOB_START,
 
   //Producer:Task
   JOB_TASK_COMPLETED,
   JOB_MAP_TASK_RESCHEDULED,
-  JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+  JOB_TASK_ATTEMPT_COMPLETED_EVENT,  // why "_EVENT" only on this one?
 
   //Producer:Any component
   JOB_DIAGNOSTIC_UPDATE,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Apr  8 02:54:56 2011
@@ -53,6 +53,8 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
+import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -111,10 +113,10 @@ public class JobImpl implements org.apac
 
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
   public static final 
-    float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+      float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
 
-  //The maximum percentage of fetch failures allowed for a map 
-  private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
+  //The maximum fraction of fetch failures allowed for a map
+  private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
 
   // Maximum no. of fetch-failure notifications after which map task is failed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -153,12 +155,14 @@ public class JobImpl implements org.apac
   private final Map<TaskAttemptId, Integer> fetchFailuresMapping = 
     new HashMap<TaskAttemptId, Integer>();
 
-  private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
-      new TaskAttemptCompletedEventTransition();
-  private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION =
-    new DiagnosticsUpdateTransition();
-  private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION =
-      new InternalErrorTransition();
+  private static final DiagnosticsUpdateTransition
+      DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+  private static final InternalErrorTransition
+      INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final TaskAttemptCompletedEventTransition
+      TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
+          new TaskAttemptCompletedEventTransition();
+
   protected static final
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
        stateMachineFactory
@@ -170,31 +174,45 @@ public class JobImpl implements org.apac
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition
-             (JobState.NEW,
-              EnumSet.of(JobState.RUNNING, JobState.FAILED),
+              (JobState.NEW,
+              EnumSet.of(JobState.INITED, JobState.FAILED),
               JobEventType.JOB_INIT,
               new InitTransition())
           .addTransition(JobState.NEW, JobState.KILLED,
-                         JobEventType.JOB_KILL, new KillNewJobTransition())
-          .addTransition(
-              JobState.NEW,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR, 
-                INTERNAL_ERROR_TRANSITION)
+              JobEventType.JOB_KILL,
+              new KillNewJobTransition())
+          .addTransition(JobState.NEW, JobState.ERROR,
+              JobEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from INITED state
+          .addTransition(JobState.INITED, JobState.INITED,
+              JobEventType.JOB_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.INITED, JobState.RUNNING,
+              JobEventType.JOB_START,
+              new StartTransition())
+          .addTransition(JobState.INITED, JobState.KILLED,
+              JobEventType.JOB_KILL,
+              new KillInitedJobTransition())
+          .addTransition(JobState.INITED, JobState.ERROR,
+              JobEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
 
           // Transitions from RUNNING state
           .addTransition(JobState.RUNNING, JobState.RUNNING,
-             JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT, 
-             TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+              JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+              TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
-             (JobState.RUNNING,
+              (JobState.RUNNING,
               EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
           .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
-                         JobEventType.JOB_KILL, new KillTasksTransition())
+              JobEventType.JOB_KILL, new KillTasksTransition())
           .addTransition(JobState.RUNNING, JobState.RUNNING,
-                         JobEventType.JOB_MAP_TASK_RESCHEDULED, 
-                         new MapTaskRescheduledTransition())
+              JobEventType.JOB_MAP_TASK_RESCHEDULED,
+              new MapTaskRescheduledTransition())
           .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
@@ -203,12 +221,12 @@ public class JobImpl implements org.apac
               new TaskAttemptFetchFailureTransition())
           .addTransition(
               JobState.RUNNING,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR, 
+              JobState.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from KILL_WAIT state.
           .addTransition
-             (JobState.KILL_WAIT,
+              (JobState.KILL_WAIT,
               EnumSet.of(JobState.KILL_WAIT, JobState.KILLED),
               JobEventType.JOB_TASK_COMPLETED,
               new KillWaitTaskCompletedTransition())
@@ -220,7 +238,7 @@ public class JobImpl implements org.apac
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(
               JobState.KILL_WAIT,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR, 
+              JobState.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
@@ -234,7 +252,7 @@ public class JobImpl implements org.apac
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(
               JobState.SUCCEEDED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR, 
+              JobState.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
@@ -247,7 +265,7 @@ public class JobImpl implements org.apac
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(
               JobState.FAILED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR, 
+              JobState.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.FAILED, JobState.FAILED,
@@ -260,7 +278,7 @@ public class JobImpl implements org.apac
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(
               JobState.KILLED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR, 
+              JobState.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.KILLED, JobState.KILLED,
@@ -300,6 +318,7 @@ public class JobImpl implements org.apac
   private float setupProgress;
   private float cleanupProgress;
   private boolean reducesScheduled;
+  private boolean isUber = false;
 
   private Credentials fsTokens;
   private Token<JobTokenIdentifier> jobToken;
@@ -375,6 +394,11 @@ public class JobImpl implements org.apac
   }
 
   @Override
+  public boolean isUber() {
+    return isUber;
+  }
+
+  @Override
   public Counters getCounters() {
     Counters counters = newCounters();
     // TODO: compute job level counters
@@ -602,16 +626,25 @@ public class JobImpl implements org.apac
 
   @Override
   public int getTotalMaps() {
-    return mapTasks.size();
+    return mapTasks.size();  //FIXME: why indirection? return numMapTasks...
+                             // unless race?  how soon can this get called?
   }
 
   @Override
   public int getTotalReduces() {
-    return reduceTasks.size();
+    return reduceTasks.size();  //FIXME: why indirection? return numReduceTasks
   }
 
   public static class InitTransition 
-         implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+      implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+
+    /**
+     * Note that this transition method is called directly (and synchronously)
+     * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
+     * just plain sequential call within AM context), so we can trigger
+     * modifications in AM state from here (at least, if AM is written that
+     * way; MR version is).
+     */
     @Override
     public JobState transition(JobImpl job, JobEvent event) {
       job.startTime = System.currentTimeMillis();
@@ -644,28 +677,96 @@ public class JobImpl implements org.apac
               "test", constructJobACLs(job.conf), 
               job.conf.get(MRJobConfig.QUEUE_NAME,"test"));
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
-        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job,
-            job.jobId);
+
+        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         job.numMapTasks = taskSplitMetaInfo.length;
-        job.numReduceTasks = 
-          job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+        job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
 
         checkTaskLimits();
 
+        long inputLength = 0;
+        for (int i = 0; i < job.numMapTasks; ++i) {
+          inputLength += taskSplitMetaInfo[i].getInputDataLength();
+        }
+
+//FIXME:  need new memory criterion for uber-decision (oops, too late here; until AM-resizing supported, must depend on job client to pass fat-slot needs)
+        // these are no longer "system" settings, necessarily; user may override
+        int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
+        int sysMaxReduces =
+            job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+        long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
+            job.conf.getLong("dfs.block.size", 64*1024*1024));  //FIXME: this is wrong; get FS from [File?]InputFormat and default block size from that
+        //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could use default AM-container memory size...]
+
+        boolean uberEnabled =
+            job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+        boolean smallNumMapTasks = (job.numMapTasks <= sysMaxMaps);
+        boolean smallNumReduceTasks = (job.numReduceTasks <= sysMaxReduces);
+        boolean smallInput = (inputLength <= sysMaxBytes);
+        boolean smallMemory = true;  //FIXME (see above)
+            // ignoring overhead due to UberTask and statics as negligible here:
+//  FIXME   && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
+//              || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
+        boolean notChainJob = !isChainJob(job.conf);
+
+        // User has overall veto power over uberization, or user can modify
+        // limits (overriding system settings and potentially shooting
+        // themselves in the head).  Note that ChainMapper/Reducer are
+        // fundamentally incompatible with MR-1220; they employ a blocking
+
+        // User has overall veto power over uberization, or user can modify
+        // limits (overriding system settings and potentially shooting
+        // themselves in the head).  Note that ChainMapper/Reducer are
+        // fundamentally incompatible with MR-1220; they employ a blocking
+        // queue between the maps/reduces and thus require parallel execution,
+        // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
+        // and thus requires sequential execution.
+        job.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
+            && smallInput && smallMemory && notChainJob;
+
+        if (job.isUber) {
+          LOG.info("Uberizing job " + job.jobId + ": " + job.numMapTasks + "m+"
+              + job.numReduceTasks + "r tasks (" + inputLength
+              + " input bytes) will run sequentially on single node.");
+              //TODO: also note which node?
+
+          // make sure reduces are scheduled only after all map are completed
+          job.conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
+                            1.0f);
+          // uber-subtask attempts all get launched on same node; if one fails,
+          // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,
+          // limit attempts to 1 (or at most 2?  probably not...)
+          job.conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+          job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+          // disable speculation:  makes no sense to speculate an entire job
+//        canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()]
+        } else {
+          StringBuilder msg = new StringBuilder();
+          msg.append("Not uberizing ").append(job.jobId).append(" because:");
+          if (!uberEnabled)
+            msg.append(" not enabled;");
+          if (!smallNumMapTasks)
+            msg.append(" too many maps;");
+          if (!smallNumReduceTasks)
+            msg.append(" too many reduces;");
+          if (!smallInput)
+            msg.append(" too much input;");
+          if (!smallMemory)
+            msg.append(" too much RAM;");
+          if (!notChainJob)
+            msg.append(" chainjob");
+          LOG.info(msg.toString());
+        }
+
         job.taskAttemptCompletionEvents =
-          new ArrayList<TaskAttemptCompletionEvent>(
-              job.numMapTasks + job.numReduceTasks
-              + 10);
-
-        createMapTasks(job, taskSplitMetaInfo);
-        job.allowedMapFailuresPercent = job.conf.getInt(
-            MRJobConfig.MAP_FAILURES_MAX_PERCENT, 
-            0);
+            new ArrayList<TaskAttemptCompletionEvent>(
+                job.numMapTasks + job.numReduceTasks + 10);
 
-        createReduceTasks(job);
-        job.allowedReduceFailuresPercent = job.conf.getInt(
-            MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 
-            0);
+        job.allowedMapFailuresPercent =
+            job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
+        job.allowedReduceFailuresPercent =
+            job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
 
         // Calculate the minimum number of maps to be complete before 
         // we should start scheduling reduces
@@ -676,21 +777,16 @@ public class JobImpl implements org.apac
                         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
                         job.numMapTasks));
 
-          //do the setup
+        // do the setup
         job.committer.setupJob(job.jobContext);
         job.setupProgress = 1.0f;
 
-          //schedule the maps
-        job.scheduleTasks(job.mapTasks);
-        JobInitedEvent jie =
-          new JobInitedEvent(TypeConverter.fromYarn(job.jobId),
-               job.startTime,
-               job.numMapTasks, job.numReduceTasks,
-               false, 0, 0,
-               JobState.NEW.toString());
-        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+        // create the Tasks but don't start them yet
+        createMapTasks(job, inputLength, taskSplitMetaInfo);
+        createReduceTasks(job);
+
+        return JobState.INITED;
 
-        return JobState.RUNNING;
       } catch (Exception e) {
         LOG.warn("Job init failed", e);
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
@@ -709,9 +805,9 @@ public class JobImpl implements org.apac
 
       job.remoteJobSubmitDir =
           FileSystem.get(job.conf).makeQualified(
-              new Path(job.conf
-                  .get(YARNApplicationConstants.APPS_STAGING_DIR_KEY),
-                  job.oldJobId.toString()));
+              new Path(
+                  job.conf.get(YARNApplicationConstants.APPS_STAGING_DIR_KEY),
+                  oldJobIDString));
       job.remoteJobConfFile =
           new Path(job.remoteJobSubmitDir,
               YARNApplicationConstants.JOB_CONF_FILE);
@@ -742,14 +838,42 @@ public class JobImpl implements org.apac
           new Path(job.remoteJobSubmitDir,
               YarnConfiguration.APPLICATION_TOKENS_FILE);
       tokenStorage.writeTokenStorageFile(remoteJobTokenFile, job.conf);
-      LOG.info("Written back the job-token file on the remote file system:"
+      LOG.info("Writing back the job-token file on the remote file system:"
           + remoteJobTokenFile.toString());
     }
 
-    private void createMapTasks(JobImpl job, TaskSplitMetaInfo[] splits) {
-      long inputLength = 0;
-      for(int i=0; i < job.numMapTasks; ++i) {
-        inputLength += splits[i].getInputDataLength();
+    /**
+     * ChainMapper and ChainReducer must execute in parallel, so they're not
+     * compatible with uberization/LocalContainerLauncher (100% sequential).
+     */
+    boolean isChainJob(Configuration conf) {
+      boolean isChainJob = false;
+      try {
+        String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
+        if (mapClassName != null) {
+          Class<?> mapClass = Class.forName(mapClassName);
+          if (ChainMapper.class.isAssignableFrom(mapClass))
+            isChainJob = true;
+        }
+      } catch (ClassNotFoundException cnfe) {
+        // don't care; assume it's not derived from ChainMapper
+      }
+      try {
+        String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
+        if (reduceClassName != null) {
+          Class<?> reduceClass = Class.forName(reduceClassName);
+          if (ChainReducer.class.isAssignableFrom(reduceClass))
+            isChainJob = true;
+        }
+      } catch (ClassNotFoundException cnfe) {
+        // don't care; assume it's not derived from ChainReducer
+      }
+      return isChainJob;
+    }
+
+    private void createMapTasks(JobImpl job, long inputLength,
+                                TaskSplitMetaInfo[] splits) {
+      for (int i=0; i < job.numMapTasks; ++i) {
         TaskImpl task =
             new MapTaskImpl(job.jobId, i,
                 job.eventHandler, 
@@ -774,8 +898,8 @@ public class JobImpl implements org.apac
                 job.fsTokens.getAllTokens());
         job.addTask(task);
       }
-      LOG.info("Number of reduces for job " + job.
-          jobId + " = " + job.numReduceTasks);
+      LOG.info("Number of reduces for job " + job.jobId + " = "
+          + job.numReduceTasks);
     }
 
     protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
@@ -798,6 +922,26 @@ public class JobImpl implements org.apac
     private void checkTaskLimits() {
       // no code, for now
     }
+  } // end of InitTransition
+
+  public static class StartTransition
+  implements SingleArcTransition<JobImpl, JobEvent> {
+    /**
+     * This transition executes in the event-dispatcher thread, though it's
+     * triggered in MRAppMaster's startJobs() method.
+     */
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.startTime = System.currentTimeMillis();
+      job.scheduleTasks(job.mapTasks);  // schedule (i.e., start) the maps
+      JobInitedEvent jie =
+        new JobInitedEvent(TypeConverter.fromYarn(job.jobId),
+             job.startTime,
+             job.numMapTasks, job.numReduceTasks,
+             job.isUber, 0, 0,  // FIXME: lose latter two args again (old-style uber junk:  needs to go along with 98% of other old-style uber junk)
+             JobState.NEW.toString());
+      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+    }
   }
 
   private void abortJob(
@@ -817,23 +961,31 @@ public class JobImpl implements org.apac
     eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
   }
 
-  private static class KillNewJobTransition 
-    implements SingleArcTransition<JobImpl, JobEvent> {
+  // Task-start has been moved out of InitTransition, so this arc simply
+  // hardcodes 0 for both map and reduce finished tasks.
+  private static class KillNewJobTransition
+  implements SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(job.jobId),
-              job.finishTime,
-              job.succeededMapTaskCount,
-              job.numReduceTasks, //TODO finishedReduceTasks
+              job.finishTime, 0, 0,
               org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
+      job.finished();
+    }
+  }
 
+  private static class KillInitedJobTransition
+  implements SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
       job.finished();
     }
   }
 
-  private static class KillTasksTransition 
+  private static class KillTasksTransition
       implements SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
@@ -893,11 +1045,9 @@ public class JobImpl implements org.apac
         
         float failureRate = (float) fetchFailures / runningReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
-        boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) 
-                              ? true
-                              : false;
-        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
-            && isMapFaulty) {
+        boolean isMapFaulty =
+            (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
           LOG.info("Too many fetch-failures for output of task attempt: " + 
               mapId + " ... raising fetch failure to map");
           job.eventHandler.handle(new TaskAttemptEvent(mapId, 
@@ -914,7 +1064,7 @@ public class JobImpl implements org.apac
     @Override
     public JobState transition(JobImpl job, JobEvent event) {
       job.completedTaskCount++;
-      LOG.info("No of completed Tasks:" + job.completedTaskCount);
+      LOG.info("Num completed Tasks: " + job.completedTaskCount);
       JobTaskEvent taskEvent = (JobTaskEvent) event;
       Task task = job.tasks.get(taskEvent.getTaskID());
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
@@ -970,7 +1120,7 @@ public class JobImpl implements org.apac
         new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
           job.finishTime,
           job.succeededMapTaskCount, job.numReduceTasks, job.failedMapTaskCount,
-          job.numReduceTasks, //TODO replace finsihedReduceTasks
+          job.numReduceTasks, //TODO replace finishedReduceTasks
           TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounter
           TypeConverter.fromYarn(job.getCounters()), // TODO reduceCounters
           TypeConverter.fromYarn(job.getCounters()));

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Apr  8 02:54:56 2011
@@ -419,7 +419,7 @@ public abstract class TaskAttemptImpl im
       memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
     }
     
-    return 1024;
+    return 1024; //FIXME:  why not "return memory;" ?
   }
 
   private static LocalResource getLocalResource(FileContext fc, Path file, 
@@ -443,7 +443,7 @@ public abstract class TaskAttemptImpl im
       FileContext remoteFS = FileContext.getFileContext(conf);
       
       Path localizedJobConf = new Path(YARNApplicationConstants.JOB_CONF_FILE);
-      remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!!
+      remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!! (presumably this means "FIXME"...)
       URL jobConfFileOnRemoteFS = ConverterUtils.getYarnUrlFromPath(localizedJobConf);
       LOG.info("The job-conf file on the remote FS is " + jobConfFileOnRemoteFS);
       
@@ -842,6 +842,7 @@ public abstract class TaskAttemptImpl im
       taskAttempt.containerID = cEvent.getContainerID();
       taskAttempt.containerMgrAddress = cEvent.getContainerManagerAddress();
       taskAttempt.containerToken = cEvent.getContainerToken();
+      // this is a _real_ Task (classic Hadoop mapred flavor):
       taskAttempt.remoteTask = taskAttempt.createRemoteTask();
       taskAttempt.jvmID = new WrappedJvmID(
           taskAttempt.remoteTask.getTaskID().getJobID(), 
@@ -858,7 +859,7 @@ public abstract class TaskAttemptImpl im
           return taskAttempt.getContainer();
         }
         @Override
-        public Task getRemoteTask() {
+        public Task getRemoteTask() {  // classic mapred Task, not YARN version
           return taskAttempt.remoteTask;
         }
       });

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri Apr  8 02:54:56 2011
@@ -189,7 +189,8 @@ public class ContainerLauncherImpl exten
           startRequest.setContainerLaunchContext(containerLaunchContext);
           proxy.startContainer(startRequest);
 
-          // after launching send launched event to taskattempt
+          // after launching, send launched event to task attempt to move
+          // it from ASSIGNED to RUNNING state
           context.getEventHandler().handle(
               new TaskAttemptEvent(launchEv.getTaskAttemptID(),
                   TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
@@ -201,9 +202,9 @@ public class ContainerLauncherImpl exten
         }
 
         break;
-      case CONTAINER_REMOTE_CLEANUP:
 
-        // We will have to remove the launch event if it is still in eventQueue
+      case CONTAINER_REMOTE_CLEANUP:
+        // We will have to remove the launch (meant "cleanup"? FIXME) event if it is still in eventQueue
         // and not yet processed
         if (eventQueue.contains(event)) {
           eventQueue.remove(event); // TODO: Any synchro needed?
@@ -228,10 +229,10 @@ public class ContainerLauncherImpl exten
                 e);
           }
 
-            // after killing send killed event to taskattempt
-            context.getEventHandler().handle(
-                new TaskAttemptEvent(event.getTaskAttemptID(),
-                    TaskAttemptEventType.TA_CONTAINER_CLEANED));
+          // after killing, send killed event to taskattempt
+          context.getEventHandler().handle(
+              new TaskAttemptEvent(event.getTaskAttemptID(),
+                  TaskAttemptEventType.TA_CONTAINER_CLEANED));
         }
         break;
       }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1090092&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Apr  8 02:54:56 2011
@@ -0,0 +1,77 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.local;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Allocates containers locally. Doesn't allocate a real container;
+ * instead sends an allocated event for all requests.
+ */
+public class LocalContainerAllocator extends RMCommunicator
+    implements ContainerAllocator {
+
+  private static final Log LOG =
+      LogFactory.getLog(LocalContainerAllocator.class);
+
+  private final EventHandler eventHandler;
+  private final ApplicationId appID;
+  private AtomicInteger containerCount = new AtomicInteger();
+
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  public LocalContainerAllocator(ClientService clientService,
+                                 AppContext context) {
+    super(clientService, context);
+    this.eventHandler = context.getEventHandler();
+    this.appID = context.getApplicationID();
+  }
+
+  @Override
+  public void handle(ContainerAllocatorEvent event) {
+    if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+      LOG.info("Processing the event " + event.toString());
+      ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
+      cID.setAppId(appID);
+      // use negative ids to denote that these are local. Need a better way ??
+      cID.setId((-1) * containerCount.getAndIncrement());
+      // send the container-assigned event to task attempt
+      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+          event.getAttemptID(), cID,
+          "localhost",//put the AppMaster hostname (TODO)
+          null));
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1090092&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Apr  8 02:54:56 2011
@@ -0,0 +1,225 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * Registers/unregisters to RM and sends heartbeats to RM.
+ */
+public class RMCommunicator extends AbstractService  {
+  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+  private static int rmPollInterval;//millis
+  protected ApplicationId applicationId;
+  private volatile boolean stopped;
+  protected Thread allocatorThread;
+  protected EventHandler eventHandler;
+  private ApplicationMaster applicationMaster;
+  protected AMRMProtocol scheduler;
+  private final ClientService clientService;
+  private int lastResponseID;
+
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  public RMCommunicator(ClientService clientService, AppContext context) {
+    super("RMCommunicator");
+    this.clientService = clientService;
+    this.eventHandler = context.getEventHandler();
+    this.applicationId = context.getApplicationID();
+    this.applicationMaster =
+        recordFactory.newRecordInstance(ApplicationMaster.class);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    rmPollInterval = conf.getInt(YarnConfiguration.AM_EXPIRY_INTERVAL, 10000)/3;
+  }
+
+  @Override
+  public void start() {
+    scheduler= createSchedulerProxy();
+    //LOG.info("Scheduler is " + scheduler);
+    register();
+    startAllocatorThread();
+    super.start();
+  }
+
+  protected void register() {
+    //Register
+    applicationMaster.setApplicationId(applicationId);
+    applicationMaster.setHost(
+        clientService.getBindAddress().getAddress().getHostAddress());
+    applicationMaster.setRpcPort(clientService.getBindAddress().getPort());
+    applicationMaster.setState(ApplicationState.RUNNING);
+    applicationMaster.setHttpPort(clientService.getHttpPort());
+    applicationMaster.setStatus(
+        recordFactory.newRecordInstance(ApplicationStatus.class));
+    applicationMaster.getStatus().setApplicationId(applicationId);
+    applicationMaster.getStatus().setProgress(0.0f);
+    try {
+      RegisterApplicationMasterRequest request =
+        recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
+      request.setApplicationMaster(applicationMaster);
+      scheduler.registerApplicationMaster(request);
+    } catch (Exception are) {
+      LOG.info("Exception while registering", are);
+      throw new YarnException(are);
+    }
+  }
+
+  protected void unregister() {
+    try {
+      applicationMaster.setState(ApplicationState.COMPLETED);
+      FinishApplicationMasterRequest request =
+          recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
+      request.setApplicationMaster(applicationMaster);
+      scheduler.finishApplicationMaster(request);
+    } catch(Exception are) {
+      LOG.info("Exception while unregistering ", are);
+    }
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    allocatorThread.interrupt();
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      LOG.info("InterruptedException while stopping", ie);
+    }
+    unregister();
+    super.stop();
+  }
+
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(rmPollInterval);
+            try {
+              heartbeat();
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM. ", e);
+            }
+          } catch (InterruptedException e) {
+            LOG.info("Allocated thread interrupted. Returning.");
+            return;
+          }
+        }
+      }
+    });
+    allocatorThread.start();
+  }
+
+  protected AMRMProtocol createSchedulerProxy() {
+    final YarnRPC rpc = YarnRPC.create(getConfig());
+    final Configuration conf = new Configuration(getConfig());
+    final String serviceAddr = conf.get(
+        YarnConfiguration.SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+          SchedulerSecurityInfo.class, SecurityInfo.class);
+
+      String tokenURLEncodedStr = System.getenv().get(
+          YarnConfiguration.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      currentUser.addToken(token);
+    }
+
+    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+            NetUtils.createSocketAddr(serviceAddr), conf);
+      }
+    });
+  }
+
+  protected synchronized void heartbeat() throws Exception {
+    ApplicationStatus status =
+        recordFactory.newRecordInstance(ApplicationStatus.class);
+    status.setApplicationId(applicationId);
+    status.setResponseId(lastResponseID);
+
+    AllocateRequest allocateRequest =
+        recordFactory.newRecordInstance(AllocateRequest.class);
+    allocateRequest.setApplicationStatus(status);
+    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>());
+    allocateRequest.addAllReleases(new ArrayList<Container>());
+    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+    AMResponse response = allocateResponse.getAMResponse();
+  }
+
+}