You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [24/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,617 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.recover;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventDiagnosticsUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventTAUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventTASucceeded;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/*
+ * Recovers the completed tasks from the previous life of Application Master.
+ * The completed tasks are deciphered from the history file of the previous life.
+ * Recovery service intercepts and replay the events for completed tasks.
+ * While recovery is in progress, the scheduling of new tasks are delayed by 
+ * buffering the task schedule events.
+ * The recovery service controls the clock while recovery is in progress.
+ */
+
+//TODO:
+//task cleanup for all non completed tasks
+public class RecoveryService extends CompositeService implements Recovery {
+
+  private static final Log LOG = LogFactory.getLog(RecoveryService.class);
+
+  private final ApplicationAttemptId applicationAttemptId;
+  private final OutputCommitter committer;
+  private final Dispatcher dispatcher;
+  private final ControlledClock clock;
+  private final AppContext appContext;
+
+  private JobInfo jobInfo = null;
+  private final Map<TaskId, TaskInfo> completedTasks =
+    new HashMap<TaskId, TaskInfo>();
+
+  private final List<TaskEvent> pendingTaskScheduleEvents =
+    new ArrayList<TaskEvent>();
+  private Map<ContainerId, ContainerInfo> containerInfo =
+      new HashMap<ContainerId, ContainerInfo>();
+  private Map<TaskAttemptId, ContainerId> attemptToContainerMap =
+      new HashMap<TaskAttemptId, ContainerId>();
+
+  private volatile boolean recoveryMode = false;
+
+  public RecoveryService(AppContext appContext, OutputCommitter committer) {
+    super("RecoveringDispatcher");
+    this.appContext = appContext;
+    this.applicationAttemptId = appContext.getApplicationAttemptId();
+    this.committer = committer;
+    this.dispatcher = createRecoveryDispatcher();
+    this.clock = new ControlledClock(appContext.getClock());
+    addService((Service) dispatcher);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    // parse the history file
+    try {
+      parse();
+    } catch (Exception e) {
+      LOG.warn(e);
+      LOG.warn("Could not parse the old history file. Aborting recovery. "
+          + "Starting afresh.", e);
+    }
+    if (completedTasks.size() > 0) {
+      recoveryMode = true;
+      LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+          + "TO RECOVER " + completedTasks.size());
+      LOG.info("Job launch time " + jobInfo.getLaunchTime());
+      clock.setTime(jobInfo.getLaunchTime());
+    }
+  }
+
+  @Override
+  public Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  @Override
+  public Clock getClock() {
+    return clock;
+  }
+
+  @Override
+  public Map<TaskId, TaskInfo> getCompletedTasks() {
+    return completedTasks;
+  }
+
+  @Override
+  public List<AMInfo> getAMInfos() {
+    if (jobInfo == null || jobInfo.getAMInfos() == null) {
+      return new LinkedList<AMInfo>();
+    }
+    List<AMInfo> amInfos = new LinkedList<AMInfo>();
+    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
+        .getAMInfos()) {
+      AMInfo amInfo =
+          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+              jhAmInfo.getNodeManagerHttpPort());
+
+      amInfos.add(amInfo);
+    }
+    return amInfos;
+  }
+
+  private void parse() throws IOException {
+    // TODO: parse history file based on startCount
+    String jobId =
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
+    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(
+        getConfig(), jobId);
+    FSDataInputStream in = null;
+    Path historyFile = null;
+    Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
+        new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
+        getConfig());
+    //read the previous history file
+    historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
+        histDirPath, jobId, (applicationAttemptId.getAttemptId() - 1)));
+    LOG.info("History file is at " + historyFile);
+    in = fc.open(historyFile);
+    JobHistoryParser parser = new JobHistoryParser(in);
+    jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    if (parseException != null) {
+      LOG.info("Got an error parsing job-history file " + historyFile + 
+          ", ignoring incomplete events.", parseException);
+    }
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+        .getAllTasks();
+    for (TaskInfo taskInfo : taskInfos.values()) {
+      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        completedTasks
+            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+        LOG.info("Read from history task "
+            + TypeConverter.toYarn(taskInfo.getTaskId()));
+      }
+    }
+    LOG.info("Read completed tasks from history "
+        + completedTasks.size());
+  }
+  
+  protected Dispatcher createRecoveryDispatcher() {
+    return new RecoveryDispatcher();
+  }
+
+  @SuppressWarnings("rawtypes")
+  class RecoveryDispatcher extends AsyncDispatcher {
+    // Intercept events when they're being drained from the queue - so oreder is considered.
+    private final EventHandler actualHandler;
+    private final EventHandler handler;
+
+    RecoveryDispatcher() {
+      super();
+      actualHandler = super.getEventHandler();
+      handler = new InterceptingEventHandler(actualHandler);
+    }
+
+    
+    @Override
+    public void dispatch(Event event) {
+      if (recoveryMode) {
+        if (event.getType() == TaskAttemptEventType.TA_STARTED_REMOTELY) {
+          // These events are split between the intercepted handle() call, and
+          // just before the dispatch.
+          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+              .getTaskAttemptID());
+          LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
+          clock.setTime(attInfo.getStartTime());
+        } else if (event.getType() == TaskAttemptEventType.TA_DONE
+            || event.getType() == TaskAttemptEventType.TA_FAIL_REQUEST
+            || event.getType() == TaskAttemptEventType.TA_KILL_REQUEST) {
+          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+              .getTaskAttemptID());
+          LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
+          clock.setTime(attInfo.getFinishTime());
+        }
+
+        else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
+            || event.getType() == TaskEventType.T_ATTEMPT_KILLED
+            || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
+          TaskEventTAUpdate tEvent = (TaskEventTAUpdate) event;
+          LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
+          TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
+              .getTaskId());
+          taskInfo.getAllTaskAttempts().remove(
+              TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
+          // remove the task info from completed tasks if all attempts are
+          // recovered
+          if (taskInfo.getAllTaskAttempts().size() == 0) {
+            completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
+            // checkForRecoveryComplete
+            LOG.info("CompletedTasks() " + completedTasks.size());
+            if (allTasksRecovered()) {
+              if (!allContainersStopped()) {
+                stopRemainingContainers(actualHandler);
+              } else {
+                endRecovery(actualHandler);
+              }
+            }
+          }
+        } else if (event.getType() == AMSchedulerEventType.S_CONTAINER_COMPLETED) {
+          // This is the last event after a container completes. TA_TERMINATED
+          // messages to tasks would have gone out, and been processed before
+          // this. As a result, TASK_CLEAN generated by TA_TERMINATED would
+          // reach the InterceptingEventHandler (and are ignored) before this
+          // event type is dispatched.
+          // At this point, it's safe to remove this container from the
+          // containerInfo map.
+          AMSchedulerEventContainerCompleted cEvent = (AMSchedulerEventContainerCompleted)event;
+          ContainerId containerId = cEvent.getContainerId();
+          LOG.info("In Recovery, Container with id: " + containerId + " completed");
+          containerInfo.remove(containerId);
+
+          // Check if recovery is complete.
+          if (allTasksRecovered() && allContainersStopped()) {
+            endRecovery(actualHandler);
+          }
+          return; //S_CONTAINER_COMPELTED does not need to reach the scheduler.
+        }
+      }
+      realDispatch(event);
+    }
+    
+    public void realDispatch(Event event) {
+      super.dispatch(event);
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return handler;
+    }
+  }
+
+  private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
+    TaskInfo taskInfo = completedTasks.get(id.getTaskId());
+    return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
+  }
+
+  protected boolean allContainersStopped() {
+    return containerInfo.size() == 0;
+  }
+
+  protected boolean allTasksRecovered() {
+    return completedTasks.size() == 0;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  protected void stopRemainingContainers(EventHandler eventHandler) {
+    for (ContainerId containerId : containerInfo.keySet()) {
+      eventHandler.handle(new AMContainerEvent(containerId,
+          AMContainerEventType.C_STOP_REQUEST));
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  protected void endRecovery(EventHandler eventHandler) {
+    recoveryMode = false;
+    clock.reset();
+    LOG.info("Setting the recovery mode to false. " + "Recovery is complete!");
+
+    // send all pending tasks schedule events
+    for (TaskEvent tEv : pendingTaskScheduleEvents) {
+      eventHandler.handle(tEv);
+    }
+  }
+
+  
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private class InterceptingEventHandler implements EventHandler {
+    //Intercept events before they're put onto the queue.
+    EventHandler actualHandler;
+
+    InterceptingEventHandler(EventHandler actualHandler) {
+      this.actualHandler = actualHandler;
+    }
+
+    @Override
+    public void handle(Event event) {
+      if (!recoveryMode) {
+        // delegate to the dispatcher one
+        actualHandler.handle(event);
+        return;
+      }
+
+      // Schedule previous finished attempts. Delay new ones till after recovery.
+      else if (event.getType() == TaskEventType.T_SCHEDULE) {
+        TaskEvent taskEvent = (TaskEvent) event;
+        // delay the scheduling of new tasks till previous ones are recovered
+        if (completedTasks.get(taskEvent.getTaskID()) == null) {
+          LOG.debug("Adding to pending task events "
+              + taskEvent.getTaskID());
+          pendingTaskScheduleEvents.add(taskEvent);
+          return;
+        }
+      }
+
+      // Intercept TaskAttempt start request.
+      else if (event.getType() == AMSchedulerEventType.S_TA_LAUNCH_REQUEST) {
+        TaskAttemptId aId = ((AMSchedulerTALaunchRequestEvent) event).getAttemptID();
+        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+        LOG.debug("TA_LAUNCH_REQUEST " + aId);
+        sendAssignedEvent(aId, attInfo, (AMSchedulerTALaunchRequestEvent)event);
+        return;
+      }
+
+      // Container Launch request. Mock and send back launched.
+      else if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+        ContainerId cId = ((NMCommunicatorLaunchRequestEvent) event)
+            .getContainerId();
+        // Simulate container launch.
+        ContainerInfo cInfo = containerInfo.get(cId);
+        actualHandler.handle(new AMContainerEventLaunched(cId, cInfo
+            .getShufflePort()));
+
+        // Simulate a pull from the TaskAttempt
+        actualHandler.handle(new AMContainerEvent(cId,
+            AMContainerEventType.C_PULL_TA));
+
+        // Inform the TaskAttempt about the assignment.
+        actualHandler.handle(new TaskAttemptEventStartedRemotely(cInfo
+            .getNextAttemptId(), cId, null, cInfo.getShufflePort()));
+        
+        // TaskAttempt doesn't generate any useful event while in RUNNING. Generate events for next state here.
+        TaskAttemptId aId = cInfo.getNextAttemptId();
+        
+        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+        // send the status update event
+        sendStatusUpdateEvent(aId, attInfo);
+
+        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
+        switch (state) {
+        case SUCCEEDED:
+          //recover the task output
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
+              attInfo.getAttemptId());
+          try { 
+            TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+            int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 
+            if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+              committer.recoverTask(taskContext);
+              LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+            } else {
+              LOG.info("Will not try to recover output for "
+                  + taskContext.getTaskAttemptID());
+            }
+          } catch (IOException e) {
+            LOG.error("Caught an exception while trying to recover task "+aId, e);
+            actualHandler.handle(new JobEventDiagnosticsUpdate(
+                aId.getTaskId().getJobId(), "Error in recovering task output " + 
+                e.getMessage()));
+            actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
+                JobEventType.INTERNAL_ERROR));
+          }
+          
+          // send the done event
+          LOG.info("Sending done event to recovered attempt " + aId);
+          actualHandler.handle(new TaskAttemptEvent(aId,
+              TaskAttemptEventType.TA_DONE));
+          // TODO thh.unregister happens here. Ensure THH handles it
+          // correctly in case of recovery. 
+          break;
+        case KILLED:
+          LOG.info("Sending kill event to recovered attempt " + aId);
+          actualHandler.handle(new TaskAttemptEventKillRequest(aId, "")); 
+          break;
+        default:
+          LOG.info("Sending fail event to recovered attempt " + aId);
+          actualHandler.handle(new TaskAttemptEventFailRequest(aId, ""));
+          break;
+        }
+        return;
+      } 
+      
+      // Handle Events which may be sent to the scheduler.
+      else if (event.getType() == AMSchedulerEventType.S_TA_ENDED) {
+        // Tell the container to stop.
+        AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) event;
+        ContainerId containerId = attemptToContainerMap.get(sEvent
+            .getAttemptID());
+        switch (sEvent.getState()) {
+        case FAILED: 
+        case KILLED:
+          actualHandler.handle(new AMContainerEvent(containerId,
+              AMContainerEventType.C_STOP_REQUEST));
+          return;
+          // TODO chh.unregister happens here. Ensure THH handles it
+          // correctly in case of recovery.
+        case SUCCEEDED:
+          // Inform the container that the task attempt succeeded.
+          // Leaving the event in the map - for TA failure after success.
+          actualHandler.handle(new AMContainerEventTASucceeded(containerId,
+              sEvent.getAttemptID()));
+          return;
+          // TODO tal.unregister happens here. Ensure THH handles it
+          // correctly in case of recovery.
+        default:
+            throw new YarnException("Invalid state " + sEvent.getState());
+        }
+      }
+      
+      // De-allocate containers used by previous attempts immediately.
+      else if (event.getType() == NMCommunicatorEventType.CONTAINER_STOP_REQUEST) {
+        // Ignore. Unless we start relying on a successful NM.stopContainer() call.
+        NMCommunicatorEvent nEvent = (NMCommunicatorEvent)event;
+        ContainerId cId = nEvent.getContainerId();
+        ContainerStatus cs = BuilderUtils.newContainerStatus(cId,
+            ContainerState.COMPLETE, "", 0);
+        actualHandler.handle(new AMContainerEventCompleted(cs));
+        return;
+      }
+      
+      // De-allocate containers used by previous attempts immediately.
+      else if (event.getType() == RMCommunicatorEventType.CONTAINER_DEALLOCATE) {
+        RMCommunicatorContainerDeAllocateRequestEvent dEvent = (RMCommunicatorContainerDeAllocateRequestEvent) event;
+        ContainerId cId = dEvent.getContainerId();
+        // exitStatus not known, diagnostics not known.
+        ContainerStatus cs = BuilderUtils.newContainerStatus(cId,
+            ContainerState.COMPLETE, "", 0);
+        actualHandler.handle(new AMContainerEventCompleted(cs));
+        return;
+      }
+      
+      // Received for FAILED/KILLED tasks after C_COMPLETED.
+      else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
+        TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
+        LOG.debug("TASK_CLEAN for attemptId: " + aId);
+        return;
+      }
+
+
+      // delegate to the actual handler
+      actualHandler.handle(event);
+    }
+
+    private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
+        TaskAttemptInfo attemptInfo) {
+      LOG.info("Sending status update event to " + yarnAttemptID);
+      TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+      taskAttemptStatus.id = yarnAttemptID;
+      taskAttemptStatus.progress = 1.0f;
+      taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
+      // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
+      taskAttemptStatus.phase = Phase.CLEANUP;
+      org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
+      if (cntrs == null) {
+        taskAttemptStatus.counters = null;
+      } else {
+        taskAttemptStatus.counters = cntrs;
+      }
+      actualHandler.handle(new TaskAttemptEventStatusUpdate(
+          taskAttemptStatus.id, taskAttemptStatus));
+    }
+
+    private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
+        TaskAttemptInfo attemptInfo, AMSchedulerTALaunchRequestEvent event) {
+      LOG.info("Sending assigned event to " + yarnAttemptID);
+      ContainerId cId = attemptInfo.getContainerId();
+
+      NodeId nodeId =
+          ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+              + attemptInfo.getPort());
+      // Resource/Priority/ApplicationACLs are only needed while launching the
+      // container on an NM, these are already completed tasks, so setting them
+      // to null
+      Container container = BuilderUtils.newContainer(cId, nodeId,
+          attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
+          null, null, null);
+      
+      // Track shufflePort, attemptId for container - would normally be done by the scheduler.
+      ContainerInfo cInfo = containerInfo.get(cId);
+      if (cInfo == null) {
+        cInfo = new ContainerInfo(attemptInfo.getShufflePort());
+        containerInfo.put(cId, cInfo);
+      }
+      cInfo.setAttemptId(yarnAttemptID);
+      attemptToContainerMap.put(yarnAttemptID, cId);
+      
+      appContext.getAllNodes().nodeSeen(nodeId);
+      appContext.getAllContainers().addContainerIfNew(container);
+      
+      
+      // Request container launch for new containers.
+      if (appContext.getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+        TaskId taskId = yarnAttemptID.getTaskId();
+        AMContainerEventLaunchRequest lrEvent = new AMContainerEventLaunchRequest(
+            cId, taskId.getJobId(), taskId.getTaskType(), event.getJobToken(),
+            event.getCredentials(), false, new JobConf(appContext.getJob(
+                taskId.getJobId()).getConf()));
+        actualHandler.handle(lrEvent);
+      }
+      // Assing the task attempt to this container.
+      actualHandler.handle(new AMContainerEventAssignTA(cId, yarnAttemptID,
+          event.getRemoteTaskContext()));
+    }
+    // TODO: Handle container launch request
+  }
+
+  private static class ContainerInfo {
+    int shufflePort;
+    TaskAttemptId nextAttemptId;
+
+    ContainerInfo(int shufflePort) {
+      this.shufflePort = shufflePort;
+    }
+
+    void setAttemptId(TaskAttemptId attemptId) {
+      this.nextAttemptId = attemptId;
+    }
+    
+    int getShufflePort() {
+      return shufflePort;
+    }
+    
+    TaskAttemptId getNextAttemptId() {
+      return nextAttemptId;
+    }
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.recover;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+
+public class AMSchedulerEvent extends AbstractEvent<AMSchedulerEventType> {
+
+  // TODO Not a very useful class...
+  public AMSchedulerEvent(AMSchedulerEventType type) {
+    super(type);
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainerCompleted extends AMSchedulerEvent {
+
+  private final ContainerId containerId;
+  
+  public AMSchedulerEventContainerCompleted(ContainerId containerId) {
+    super(AMSchedulerEventType.S_CONTAINER_COMPLETED);;
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainersAllocated extends AMSchedulerEvent {
+
+  private final List<ContainerId> containerIds;
+  private final boolean headRoomChanged;
+
+  // TODO Maybe distinguish between newly allocated containers and 
+  // existing containers being re-used.
+  // headRoomChanged is a strange API - making an assumption about how the
+  // scheduler will use this info.
+  public AMSchedulerEventContainersAllocated(List<ContainerId> containerIds,
+      boolean headRoomChanged) {
+    super(AMSchedulerEventType.S_CONTAINERS_ALLOCATED);
+    this.containerIds = containerIds;
+    this.headRoomChanged = headRoomChanged;
+  }
+
+  public List<ContainerId> getContainerIds() {
+    return this.containerIds;
+  }
+
+  public boolean didHeadroomChange() {
+    return headRoomChanged;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMSchedulerEventNodeBlacklisted extends AMSchedulerEvent {
+
+  private final NodeId nodeId;
+
+  public AMSchedulerEventNodeBlacklisted(NodeId nodeId) {
+    super(AMSchedulerEventType.S_NODE_BLACKLISTED);
+    this.nodeId = nodeId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
+
+  private final TaskAttemptId attemptId;
+  private final ContainerId containerId;
+  private TaskAttemptState state;
+
+  public AMSchedulerEventTAEnded(TaskAttemptId attemptId,
+      ContainerId containerId, TaskAttemptState state) {
+    super(AMSchedulerEventType.S_TA_ENDED);
+    this.attemptId = attemptId;
+    this.containerId = containerId;
+    this.state = state;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return this.attemptId;
+  }
+
+  public ContainerId getUsedContainerId() {
+    return this.containerId;
+  }
+
+  public TaskAttemptState getState() {
+    return this.state;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+public enum AMSchedulerEventType {
+  //Producer: TaskAttempt
+  S_TA_LAUNCH_REQUEST,
+  S_TA_ENDED, // Annotated with FAILED/KILLED/SUCCEEDED.
+
+  //Producer: RMCommunicator
+  S_CONTAINERS_ALLOCATED,
+
+  //Producer: Container. (Maybe RMCommunicator)
+  S_CONTAINER_COMPLETED,
+
+  //Producer: Node
+  S_NODE_BLACKLISTED,
+  S_NODE_UNHEALTHY,
+  S_NODE_HEALTHY
+  // The scheduler should have a way of knowing about unusable nodes. Acting on
+  // this information to change requests etc is scheduler specific.
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+public class AMSchedulerTALaunchRequestEvent extends AMSchedulerEvent {
+
+  // TODO Get rid of remoteTask from here. Can be forgottent after it has been assigned.
+  //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
+  
+  private final TaskAttemptId attemptId;
+  private final boolean rescheduled;
+  private final Resource capability;
+  private final MRTaskContext remoteTaskContext;
+  private final TaskAttempt taskAttempt;
+  private final Credentials credentials;
+  private Token<JobTokenIdentifier> jobToken;
+  private final String[] hosts;
+  private final String[] racks;
+  
+  
+  public AMSchedulerTALaunchRequestEvent(TaskAttemptId attemptId,
+      boolean rescheduled, Resource capability,
+      MRTaskContext remoteTaskContext, TaskAttempt ta,
+      Credentials credentials, Token<JobTokenIdentifier> jobToken,
+      String[] hosts, String[] racks) {
+    super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
+    this.attemptId = attemptId;
+    this.rescheduled = rescheduled;
+    this.capability = capability;
+    this.remoteTaskContext = remoteTaskContext;
+    this.taskAttempt = ta;
+    this.credentials = credentials;
+    this.jobToken = jobToken;
+    this.hosts = hosts;
+    this.racks = racks;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return this.attemptId;
+  }
+
+  public Resource getCapability() {
+    return capability;
+  }
+
+  public String[] getHosts() {
+    return hosts;
+  }
+  
+  public String[] getRacks() {
+    return racks;
+  }
+  
+  public boolean isRescheduled() {
+    return rescheduled;
+  }
+  
+  public MRTaskContext getRemoteTaskContext() {
+    return remoteTaskContext;
+  }
+  
+  public TaskAttempt getTaskAttempt() {
+    return this.taskAttempt;
+  }
+  
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+  
+  public Token<JobTokenIdentifier> getJobToken() {
+    return this.jobToken;
+  }
+
+  // Parameter replacement: @taskid@ will not be usable
+  // ProfileTaskRange not available along with ContainerReUse
+
+  /*Requirements to determine a container request.
+   * + Data-local + Rack-local hosts.
+   * + Resource capability
+   * + Env - mapreduce.map.env / mapreduce.reduce.env can change. M/R log level. 
+   * - JobConf and JobJar file - same location.
+   * - Distributed Cache - identical for map / reduce tasks at the moment.
+   * - Credentials, tokens etc are identical.
+   * + Command - dependent on map / reduce java.opts
+   */
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+// TODO XXX Rename to AMScheduler.
+public interface ContainerAllocator extends EventHandler<AMSchedulerEvent>{
+
+//  enum EventType {
+//
+//    CONTAINER_REQ,
+//    CONTAINER_DEALLOCATE,
+//    CONTAINER_FAILED
+//  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerRequestor extends EventHandler<RMCommunicatorEvent> {
+  public Resource getAvailableResources();
+
+  public void addContainerReq(ContainerRequest req);
+
+  public void decContainerReq(ContainerRequest req);
+  
+  public Map<ApplicationAccessType, String> getApplicationACLs();
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> {
+
+  private final ContainerId containerId;
+  private final NodeId nodeId;
+  private final ContainerToken containerToken;
+
+  public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken, NMCommunicatorEventType type) {
+    super(type);
+    this.containerId = containerId;
+    this.nodeId = nodeId;
+    this.containerToken = containerToken;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+
+  public ContainerToken getContainerToken() {
+    return this.containerToken;
+  }
+  
+  public String toSrting() {
+    return super.toString() + " for container " + containerId + ", nodeId: "
+        + nodeId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((containerId == null) ? 0 : containerId.hashCode());
+    result = prime * result
+        + ((containerToken == null) ? 0 : containerToken.hashCode());
+    result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    NMCommunicatorEvent other = (NMCommunicatorEvent) obj;
+    if (containerId == null) {
+      if (other.containerId != null)
+        return false;
+    } else if (!containerId.equals(other.containerId))
+      return false;
+    if (containerToken == null) {
+      if (other.containerToken != null)
+        return false;
+    } else if (!containerToken.equals(other.containerToken))
+      return false;
+    if (nodeId == null) {
+      if (other.nodeId != null)
+        return false;
+    } else if (!nodeId.equals(other.nodeId))
+      return false;
+    return true;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+// TODO - Re-use the events in ContainerLauncher..
+public enum NMCommunicatorEventType {
+  CONTAINER_LAUNCH_REQUEST,
+  CONTAINER_STOP_REQUEST
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+
+public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
+
+  private final ContainerLaunchContext clc;
+
+  public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
+      Container container) {
+    super(clc.getContainerId(), container.getNodeId(), container
+        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+    this.clc = clc;
+  }
+
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return this.clc;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
+
+  public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) {
+    super(containerId, nodeId, containerToken,
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,315 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.NotRunningJob;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Registers/unregisters to RM and sends heartbeats to RM.
+ */
+public abstract class RMCommunicator extends AbstractService  {
+  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+  private int rmPollInterval;//millis
+  protected ApplicationId applicationId;
+  protected ApplicationAttemptId applicationAttemptId;
+  private AtomicBoolean stopped;
+  protected Thread allocatorThread;
+  @SuppressWarnings("rawtypes")
+  protected EventHandler eventHandler;
+  protected AMRMProtocol scheduler;
+  private final ClientService clientService;
+  protected int lastResponseID;
+  private Resource minContainerCapability;
+  private Resource maxContainerCapability;
+  protected Map<ApplicationAccessType, String> applicationACLs;
+
+  protected final AppContext context;
+  private Job job = null;
+  private JobId jobId;
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+
+  public RMCommunicator(ClientService clientService, AppContext context) {
+    super(RMCommunicator.class.getSimpleName());
+    this.clientService = clientService;
+    this.context = context;
+    this.eventHandler = context.getEventHandler();
+    this.applicationId = context.getApplicationID();
+    this.applicationAttemptId = context.getApplicationAttemptId();
+    this.stopped = new AtomicBoolean(false);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    rmPollInterval =
+        conf.getInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS);
+  }
+
+  @Override
+  public void start() {
+    scheduler= createSchedulerProxy();
+    register();
+    startAllocatorThread();
+    JobID id = TypeConverter.fromYarn(this.applicationId);
+    jobId = TypeConverter.toYarn(id);
+    job = getJob();
+    super.start();
+  }
+
+  protected AppContext getContext() {
+    return context;
+  }
+
+  protected Job getJob() {
+    if (job == null) {
+      job = context.getJob(jobId);
+    }
+    if (job instanceof NotRunningJob) {
+      job = null;
+    }
+    return job;
+  }
+
+  /**
+   * Get the appProgress. Can be used only after this component is started.
+   * @return the appProgress.
+   */
+  protected float getApplicationProgress() {
+    // For now just a single job. In future when we have a DAG, we need an
+    // aggregate progress.
+    Job j = getJob();
+    return (float) ((j != null) ? j.getProgress() : 0.0);
+  }
+
+  // TODO EVENTUALLY. get rid of this, when RMComma dn RMContainerRequestor are
+  // collapsed.
+  public Map<ApplicationAccessType, String> getApplicationAcls() {
+    return this.applicationACLs;
+  }
+
+  // TODO (After 3902): Get rid of the dependencies on the ClientService.
+  protected void register() {
+    //Register
+    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    try {
+      RegisterApplicationMasterRequest request = Records
+          .newRecord(RegisterApplicationMasterRequest.class);
+      request.setApplicationAttemptId(applicationAttemptId);
+      request.setHost(serviceAddr.getHostName());
+      request.setRpcPort(serviceAddr.getPort());
+      request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      RegisterApplicationMasterResponse response =
+        scheduler.registerApplicationMaster(request);
+      minContainerCapability = response.getMinimumResourceCapability();
+      maxContainerCapability = response.getMaximumResourceCapability();
+      this.context.getClusterInfo().setMinContainerCapability(
+          minContainerCapability);
+      this.context.getClusterInfo().setMaxContainerCapability(
+          maxContainerCapability);
+      this.applicationACLs = response.getApplicationACLs();
+      LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
+      LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
+    } catch (Exception are) {
+      LOG.error("Exception while registering", are);
+      throw new YarnException(are);
+    }
+  }
+
+  protected void unregister() {
+    try {
+      Job j = getJob();
+      StringBuffer sb = new StringBuffer();
+      String historyUrl = "";
+      FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+      if (j == null) {
+        finishState = FinalApplicationStatus.FAILED;
+        sb.append("Application failed due to unknown reason."
+            + " No job submitted to this MR AM");
+      } else {
+        if (j.getState() == JobState.SUCCEEDED) {
+          finishState = FinalApplicationStatus.SUCCEEDED;
+        } else if (j.getState() == JobState.KILLED
+            || (j.getState() == JobState.RUNNING && isSignalled)) {
+          finishState = FinalApplicationStatus.KILLED;
+        } else if (j.getState() == JobState.FAILED
+            || j.getState() == JobState.ERROR) {
+          finishState = FinalApplicationStatus.FAILED;
+        }
+        for (String s : j.getDiagnostics()) {
+          sb.append(s).append("\n");
+        }
+        LOG.info("Setting job diagnostics to " + sb.toString());
+
+        historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
+            context.getApplicationID());
+        LOG.info("History url is " + historyUrl);
+      }
+
+      FinishApplicationMasterRequest request = Records
+          .newRecord(FinishApplicationMasterRequest.class);
+      request.setAppAttemptId(this.applicationAttemptId);
+      request.setFinishApplicationStatus(finishState);
+      request.setDiagnostics(sb.toString());
+      request.setTrackingUrl(historyUrl);
+      scheduler.finishApplicationMaster(request);
+    } catch(Exception are) {
+      LOG.error("Exception while unregistering ", are);
+    }
+  }
+
+  protected Resource getMinContainerCapability() {
+    return minContainerCapability;
+  }
+
+  protected Resource getMaxContainerCapability() {
+    return maxContainerCapability;
+  }
+
+  @Override
+  public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
+    allocatorThread.interrupt();
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      LOG.warn("InterruptedException while stopping", ie);
+    }
+    unregister();
+    super.stop();
+  }
+
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(rmPollInterval);
+            try {
+              heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM. ", e);
+              // TODO: for other exceptions
+            }
+          } catch (InterruptedException e) {
+            LOG.warn("Allocated thread interrupted. Returning.");
+            return;
+          }
+        }
+      }
+    });
+    allocatorThread.setName("RMCommunicator");
+    allocatorThread.start();
+  }
+
+  protected AMRMProtocol createSchedulerProxy() {
+    final Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress serviceAddr = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, serviceAddr);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+            serviceAddr, conf);
+      }
+    });
+  }
+
+  protected abstract void heartbeat() throws Exception;
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class RMCommunicatorContainerDeAllocateRequestEvent extends
+    RMCommunicatorEvent {
+
+  private final ContainerId containerId;
+  
+  public RMCommunicatorContainerDeAllocateRequestEvent(ContainerId containerId) {
+    super(RMCommunicatorEventType.CONTAINER_DEALLOCATE);
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMCommunicatorEvent extends AbstractEvent<RMCommunicatorEventType> {
+
+  public RMCommunicatorEvent(RMCommunicatorEventType type) {
+    super(type);
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+public enum RMCommunicatorEventType {
+  CONTAINER_DEALLOCATE,
+}