You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [15/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,1320 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.MRVertexOutputCommitter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.impl.NullVertexOutputCommitter;
+import org.apache.tez.dag.api.impl.VertexContext;
+import org.apache.tez.dag.api.impl.VertexOutputCommitter;
+import org.apache.tez.dag.api.records.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexScheduler;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+
+
+/** Implementation of Vertex interface. Maintains the state machines of Vertex.
+ * The read and write calls use ReadWriteLock for concurrency.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+  EventHandler<VertexEvent>, VertexContext {
+
+  private static final TezDependentTaskCompletionEvent[]
+    EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TezDependentTaskCompletionEvent[0];
+
+  private static final Log LOG = LogFactory.getLog(VertexImpl.class);
+
+  //The maximum fraction of fetch failures allowed for a map
+  private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
+
+  // Maximum no. of fetch-failure notifications after which map task is failed
+  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+
+  //final fields
+  private final Clock clock;
+
+  // TODO: Recovery
+  //private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
+
+  private final Lock readLock;
+  private final Lock writeLock;
+  private final TaskAttemptListener taskAttemptListener;
+  private final TaskHeartbeatHandler taskHeartbeatHandler;
+  private final Object tasksSyncHandle = new Object();
+
+  private final EventHandler eventHandler;
+  // TODO Metrics
+  //private final MRAppMetrics metrics;
+  private final AppContext appContext;
+  
+  private boolean lazyTasksCopyNeeded = false;
+  volatile Map<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
+  private Object fullCountersLock = new Object();
+  private TezCounters fullCounters = null;
+  private Resource taskResource;
+
+  public DAGConfiguration conf;
+
+  //fields initialized in init
+
+  private int numStartedSourceVertices = 0;
+  private int distanceFromRoot = 0;
+  
+  private List<TezDependentTaskCompletionEvent> sourceTaskAttemptCompletionEvents;
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  //task/attempt related datastructures
+  private final Map<TezTaskID, Integer> successSourceAttemptCompletionEventNoMap =
+    new HashMap<TezTaskID, Integer>();
+  private final Map<TezTaskAttemptID, Integer> fetchFailuresMapping =
+    new HashMap<TezTaskAttemptID, Integer>();
+
+  List<InputSpec> inputSpecList;
+  List<OutputSpec> outputSpecList;
+  
+  private static final InternalErrorTransition
+      INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final TaskAttemptCompletedEventTransition
+      TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
+          new TaskAttemptCompletedEventTransition();
+  private static final SourceTaskAttemptCompletedEventTransition
+      SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
+          new SourceTaskAttemptCompletedEventTransition();
+
+  protected static final
+    StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
+       stateMachineFactory
+     = new StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
+              (VertexState.NEW)
+
+          // Transitions from NEW state
+          .addTransition
+              (VertexState.NEW,
+              EnumSet.of(VertexState.INITED, VertexState.FAILED),
+              VertexEventType.V_INIT,
+              new InitTransition())
+          .addTransition(VertexState.NEW, VertexState.KILLED,
+              VertexEventType.V_KILL,
+              new KillNewVertexTransition())
+          .addTransition(VertexState.NEW, VertexState.ERROR,
+              VertexEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from INITED state
+          .addTransition(VertexState.INITED, VertexState.INITED,
+              VertexEventType.V_SOURCE_VERTEX_STARTED,
+              new SourceVertexStartedTransition())
+          .addTransition(VertexState.INITED, VertexState.RUNNING, 
+              VertexEventType.V_START,
+              new StartTransition())
+
+          .addTransition(VertexState.INITED, VertexState.KILLED,
+              VertexEventType.V_KILL,
+              new KillInitedVertexTransition())
+          .addTransition(VertexState.INITED, VertexState.ERROR,
+              VertexEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from RUNNING state
+          .addTransition(VertexState.RUNNING, VertexState.RUNNING,
+              VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+              TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+          .addTransition(VertexState.RUNNING, VertexState.RUNNING,
+              VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
+              SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+          .addTransition
+              (VertexState.RUNNING,
+              EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED),
+              VertexEventType.V_TASK_COMPLETED,
+              new TaskCompletedTransition())
+          .addTransition
+              (VertexState.RUNNING,
+              EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED),
+              VertexEventType.V_COMPLETED,
+              new VertexNoTasksCompletedTransition())
+          .addTransition(VertexState.RUNNING, VertexState.KILL_WAIT,
+              VertexEventType.V_KILL, new KillTasksTransition())
+          .addTransition(VertexState.RUNNING, VertexState.RUNNING,
+              VertexEventType.V_TASK_RESCHEDULED,
+              new TaskRescheduledTransition())
+          .addTransition(VertexState.RUNNING, VertexState.RUNNING,
+              VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
+              new TaskAttemptFetchFailureTransition())
+          .addTransition(
+              VertexState.RUNNING,
+              VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from KILL_WAIT state.
+          .addTransition
+              (VertexState.KILL_WAIT,
+              EnumSet.of(VertexState.KILL_WAIT, VertexState.KILLED),
+              VertexEventType.V_TASK_COMPLETED,
+              new KillWaitTaskCompletedTransition())
+          .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
+              VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+              TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+          .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
+              VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
+              SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+          .addTransition(
+              VertexState.KILL_WAIT,
+              VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
+              EnumSet.of(VertexEventType.V_KILL,
+                  VertexEventType.V_TASK_RESCHEDULED,
+                  VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE))
+
+          // Transitions from SUCCEEDED state
+          .addTransition(
+              VertexState.SUCCEEDED,
+              VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
+              EnumSet.of(VertexEventType.V_KILL,
+                  VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
+                  VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_TASK_COMPLETED))
+
+          // Transitions from FAILED state
+          .addTransition(
+              VertexState.FAILED,
+              VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(VertexState.FAILED, VertexState.FAILED,
+              EnumSet.of(VertexEventType.V_KILL,
+                  VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
+                  VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_TASK_COMPLETED))
+
+          // Transitions from KILLED state
+          .addTransition(
+              VertexState.KILLED,
+              VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able events
+          .addTransition(VertexState.KILLED, VertexState.KILLED,
+              EnumSet.of(VertexEventType.V_KILL,
+                  VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
+                  VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_TASK_COMPLETED))
+
+          // No transitions from INTERNAL_ERROR state. Ignore all.
+          .addTransition(
+              VertexState.ERROR,
+              VertexState.ERROR,
+              EnumSet.of(VertexEventType.V_INIT,
+                  VertexEventType.V_KILL,
+                  VertexEventType.V_TASK_COMPLETED,
+                  VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_TASK_RESCHEDULED,
+                  VertexEventType.V_DIAGNOSTIC_UPDATE,
+                  VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
+                  VertexEventType.INTERNAL_ERROR))
+          // create the topology tables
+          .installTopology();
+
+  private final StateMachine<VertexState, VertexEventType, VertexEvent> stateMachine;
+
+  //changing fields while the vertex is running
+  private int numTasks;
+  private int completedTaskCount = 0;
+  private int succeededTaskCount = 0;
+  private int failedTaskCount = 0;
+  private int killedTaskCount = 0;
+
+  private long initTime;
+  private long startTime;
+  private long finishTime;
+  private float progress;
+
+  private Credentials fsTokens;
+  private Token<JobTokenIdentifier> jobToken;
+  private JobTokenSecretManager jobTokenSecretManager;
+
+  private final TezVertexID vertexId;
+
+  private final String vertexName;
+  private final String processorName;
+
+  private Map<Vertex, EdgeProperty> sourceVertices;
+  private int sourcePhysicalEdges = 0;
+  private Map<Vertex, EdgeProperty> targetVertices;
+  
+  private VertexScheduler vertexScheduler;
+
+  private VertexOutputCommitter committer;
+  private AtomicBoolean committed = new AtomicBoolean(false);
+  private VertexLocationHint vertexLocationHint;
+  private Map<String, LocalResource> localResources;
+  private Map<String, String> environment;
+
+  public VertexImpl(TezVertexID vertexId, String vertexName,
+      DAGConfiguration conf, EventHandler eventHandler,
+      TaskAttemptListener taskAttemptListener,
+      JobTokenSecretManager jobTokenSecretManager,
+      Token<JobTokenIdentifier> jobToken,
+      Credentials fsTokenCredentials, Clock clock,
+      // TODO: Recovery
+      //Map<TaskId, TaskInfo> completedTasksFromPreviousRun,
+      // TODO Metrics
+      //MRAppMetrics metrics,
+      TaskHeartbeatHandler thh,
+      AppContext appContext, VertexLocationHint vertexLocationHint) {
+    this.vertexId = vertexId;
+    this.vertexName = vertexName;
+    this.conf = conf;
+    //this.metrics = metrics;
+    this.clock = clock;
+    // TODO: Recovery
+    //this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
+    this.appContext = appContext;
+
+    this.taskAttemptListener = taskAttemptListener;
+    this.taskHeartbeatHandler = thh;
+    this.eventHandler = eventHandler;
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    this.fsTokens = fsTokenCredentials;
+    this.jobTokenSecretManager = jobTokenSecretManager;
+    this.jobToken = jobToken;
+    this.committer = new NullVertexOutputCommitter();
+    this.vertexLocationHint = vertexLocationHint;
+    
+    this.taskResource = conf.getVertexResource(getName());
+    this.processorName = conf.getVertexTaskModuleClassName(getName());
+    this.localResources = conf.getVertexLocalResources(getName());
+    this.environment = conf.getVertexEnv(getName());
+
+    // This "this leak" is okay because the retained pointer is in an
+    //  instance variable.
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
+    return stateMachine;
+  }
+
+  @Override
+  public TezVertexID getVertexId() {
+    return vertexId;
+  }
+  
+  @Override
+  public int getDistanceFromRoot() {
+    return distanceFromRoot;
+  }
+
+  @Override
+  public DAGConfiguration getConf() {
+    // FIXME this should be renamed as it is giving global DAG conf
+    // we need a function to give user-land configuration for this vertex
+    return conf;
+  }
+
+  @Override
+  public String getName() {
+    return vertexName;
+  }
+
+  EventHandler getEventHandler() {
+    return this.eventHandler;
+  }
+
+  @Override
+  public Task getTask(TezTaskID taskID) {
+    readLock.lock();
+    try {
+      return tasks.get(taskID);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public int getTotalTasks() {
+    return numTasks;
+  }
+
+  @Override
+  public int getCompletedTasks() {
+    readLock.lock();
+    try {
+      return succeededTaskCount + failedTaskCount + killedTaskCount;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TezCounters getAllCounters() {
+
+    readLock.lock();
+
+    try {
+      VertexState state = getInternalState();
+      if (state == VertexState.ERROR || state == VertexState.FAILED
+          || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
+        this.mayBeConstructFinalFullCounters();
+        return fullCounters;
+      }
+
+      TezCounters counters = new TezCounters();
+      return incrTaskCounters(counters, tasks.values());
+
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public static TezCounters incrTaskCounters(
+      TezCounters counters, Collection<Task> tasks) {
+    for (Task task : tasks) {
+      counters.incrAllCounters(task.getCounters());
+    }
+    return counters;
+  }
+
+  @Override
+  public TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
+      int fromEventId, int maxEvents) {
+    TezDependentTaskCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
+    readLock.lock();
+    try {
+      if (sourceTaskAttemptCompletionEvents.size() > fromEventId) {
+        int actualMax = Math.min(maxEvents,
+            (sourceTaskAttemptCompletionEvents.size() - fromEventId));
+        events = sourceTaskAttemptCompletionEvents.subList(fromEventId,
+            actualMax + fromEventId).toArray(events);
+      }
+      return events;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public List<String> getDiagnostics() {
+    readLock.lock();
+    try {
+      return diagnostics;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    this.readLock.lock();
+    try {
+      computeProgress();
+      return progress;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  private void computeProgress() {
+    this.readLock.lock();
+    try {
+      float progress = 0f;
+      for (Task task : this.tasks.values()) {
+        progress += (task.isFinished() ? 1f : task.getProgress());
+      }
+      if (this.numTasks != 0) {
+        progress /= this.numTasks;
+      }
+      this.progress = progress;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  public Map<TezTaskID, Task> getTasks() {
+    synchronized (tasksSyncHandle) {
+      lazyTasksCopyNeeded = true;
+      return Collections.unmodifiableMap(tasks);
+    }
+  }
+
+  @Override
+  public VertexState getState() {
+    readLock.lock();
+    try {
+      return getStateMachine().getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void scheduleTasks(Collection<TezTaskID> taskIDs) {
+    for (TezTaskID taskID : taskIDs) {
+      eventHandler.handle(new TaskEvent(taskID,
+          TaskEventType.T_SCHEDULE));
+    }
+  }
+
+  @Override
+  /**
+   * The only entry point to change the Vertex.
+   */
+  public void handle(VertexEvent event) {
+    LOG.info("DEBUG: Processing VertexEvent " + event.getVertexId()
+        + " of type " + event.getType() + " while in state "
+        + getInternalState() + ". Event: " + event);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing VertexEvent " + event.getVertexId() + " of type "
+          + event.getType() + " while in state " + getInternalState());
+    }
+    try {
+      writeLock.lock();
+      VertexState oldState = getInternalState();
+      try {
+         getStateMachine().doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        addDiagnostic("Invalid event " + event.getType() +
+            " on Job " + this.vertexId);
+        eventHandler.handle(new VertexEvent(this.vertexId,
+            VertexEventType.INTERNAL_ERROR));
+      }
+      //notify the eventhandler of state change
+      if (oldState != getInternalState()) {
+        LOG.info(vertexId + " transitioned from " + oldState + " to "
+                 + getInternalState());
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Private
+  public VertexState getInternalState() {
+    readLock.lock();
+    try {
+     return getStateMachine().getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  //helpful in testing
+  protected void addTask(Task task) {
+    synchronized (tasksSyncHandle) {
+      if (lazyTasksCopyNeeded) {
+        Map<TezTaskID, Task> newTasks = new LinkedHashMap<TezTaskID, Task>();
+        newTasks.putAll(tasks);
+        tasks = newTasks;
+        lazyTasksCopyNeeded = false;
+      }
+    }
+    tasks.put(task.getTaskId(), task);
+    // TODO Metrics
+    //metrics.waitingTask(task);
+  }
+
+  void setFinishTime() {
+    finishTime = clock.getTime();
+  }
+
+  void logJobHistoryVertexInitedEvent() {
+    // TODO JobHistory
+    /*
+    JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
+        job.conf.get(MRJobConfig.JOB_NAME, "test"),
+      job.conf.get(MRJobConfig.USER_NAME, "mapred"),
+      job.appSubmitTime,
+      job.remoteJobConfFile.toString(),
+      job.jobACLs, job.queueName);
+    job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
+    */
+  }
+
+  void logJobHistoryVertexStartedEvent() {
+    // TODO JobHistory
+    /*
+      JobInitedEvent jie =
+        new JobInitedEvent(job.oldJobId,
+             job.startTime,
+             job.numMapTasks, job.numReduceTasks,
+             job.getState().toString(),
+             job.isUber()); //Will transition to state running. Currently in INITED
+      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+      JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
+          job.appSubmitTime, job.startTime);
+      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
+     */
+
+  }
+
+
+  void logJobHistoryVertexFinishedEvent() {
+    this.setFinishTime();
+    // TODO JobHistory
+    //eventHandler.handle(new JobFinishEvent(jobId));
+  }
+
+  void logJobHistoryVertexAbortedEvent() {
+    // TODO JobHistory
+    /*
+    JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
+        new JobUnsuccessfulCompletionEvent(oldJobId,
+            finishTime,
+            succeededMapTaskCount,
+            succeededReduceTaskCount,
+            finalState.toString());
+      eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
+      */
+  }
+
+  void logJobHistoryUnsuccessfulVertexCompletion() {
+    // TODO JobHistory
+    /*
+    JobUnsuccessfulCompletionEvent failedEvent =
+        new JobUnsuccessfulCompletionEvent(job.oldJobId,
+            job.finishTime, 0, 0,
+            VertexState.KILLED.toString());
+    job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
+    */
+  }
+
+  /**
+   * Create the default file System for this job.
+   * @param conf the conf object
+   * @return the default filesystem for this job
+   * @throws IOException
+   */
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+
+  static VertexState checkVertexCompleteSuccess(VertexImpl vertex) {
+    // FIXME this vertex is definitely buggy as completed includes killed/failed
+    // check for vertex success
+    if (vertex.completedTaskCount == vertex.tasks.size()) {
+      if (vertex.failedTaskCount > 0) {
+        try {
+          vertex.committer.abortVertex(VertexStatus.State.FAILED);
+        } catch (IOException e) {
+          LOG.error("Failed to do abort on vertex, name=" + vertex.getName(),
+              e);
+        }
+        vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+            .getVertexId(), VertexState.FAILED));
+        return vertex.finished(VertexState.FAILED);
+      } else {
+        try {
+          if (!vertex.committed.getAndSet(true)) {
+            // commit only once
+            vertex.committer.commitVertex();
+          }
+        } catch (IOException e) {
+          LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
+          vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+              .getVertexId(), VertexState.FAILED));
+          return vertex.finished(VertexState.FAILED);
+        }
+        vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+            .getVertexId(), vertex.getState()));
+        return vertex.finished(VertexState.SUCCEEDED);
+      }
+    }
+    // TODO: what if one of the tasks failed?
+    return null;
+  }
+
+  VertexState finished(VertexState finalState) {
+    if (getInternalState() == VertexState.RUNNING) {
+      // TODO: Metrics
+      // metrics.endRunningJob(this);
+    }
+    if (finishTime == 0) setFinishTime();
+    logJobHistoryVertexFinishedEvent();
+
+    switch (finalState) {
+      case KILLED:
+        // TODO: Metrics
+        //metrics.killedJob(this);
+        break;
+      case FAILED:
+        // TODO: Metrics
+        //metrics.failedJob(this);
+        break;
+      case SUCCEEDED:
+        // TODO: Metrics
+        //metrics.completedJob(this);
+    }
+    return finalState;
+  }
+
+  public static class InitTransition
+      implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      try {
+        //log to job history
+        vertex.logJobHistoryVertexInitedEvent();
+
+        // TODODAGAM
+        // TODO: Splits?
+        vertex.numTasks = vertex.conf.getNumVertexTasks(vertex.getName());
+        /*
+        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
+        job.numMapTasks = taskSplitMetaInfo.length;
+        */
+
+        if (vertex.numTasks == 0) {
+          vertex.addDiagnostic("No of tasks for vertex " + vertex.getVertexId());
+        }
+
+        checkTaskLimits();
+
+        // FIXME should depend on source num tasks
+        vertex.sourceTaskAttemptCompletionEvents =
+            new ArrayList<TezDependentTaskCompletionEvent>(vertex.numTasks + 10);
+
+        // create the Tasks but don't start them yet
+        createTasks(vertex);
+        
+        
+
+        // FIXME this only works if all edges are bipartite
+        boolean hasBipartite = false;
+        if (vertex.sourceVertices != null) {
+          for (EdgeProperty edgeProperty : vertex.sourceVertices.values()) {
+            // FIXME The init needs to be in
+            // topo sort order of graph or else source may not be initialized.
+            // Also should not depend on assumption of single-threaded dispatcher
+            if(edgeProperty.getConnectionPattern() == ConnectionPattern.BIPARTITE) {
+              hasBipartite = true;
+              break;
+            }
+          }
+        }
+        
+        if (hasBipartite) {
+          // setup vertex scheduler
+          // TODO this needs to consider data size and perhaps API. 
+          // Currently implicitly BIPARTITE is the only edge type
+          vertex.vertexScheduler = new 
+              BipartiteSlowStartVertexScheduler(vertex,
+                                                0.5f,  // FIXME get from config
+                                                0.8f); // FIXME get from config
+        } else {
+          // schedule all tasks upon vertex start
+          vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
+        }
+
+        // FIXME how do we decide vertex needs a committer?
+        // for now, only for leaf vertices
+        // FIXME make commmitter type configurable per vertex
+        if (vertex.targetVertices.isEmpty()) {
+          vertex.committer = new MRVertexOutputCommitter();
+        }
+        vertex.committer.init(vertex);
+        vertex.committer.setupVertex();
+
+        // TODO: Metrics
+        //vertex.metrics.endPreparingJob(job);
+        vertex.initTime = vertex.clock.getTime();
+        return VertexState.INITED;
+
+      } catch (IOException e) {
+        LOG.warn("Vertex init failed", e);
+        vertex.addDiagnostic("Job init failed : "
+            + StringUtils.stringifyException(e));
+        vertex.abortVertex(VertexStatus.State.FAILED);
+        // TODO: Metrics
+        //job.metrics.endPreparingJob(vertex);
+        return vertex.finished(VertexState.FAILED);
+      }
+    }
+
+
+    private void createTasks(VertexImpl vertex) {
+      // TODO Fixme
+      DAGConfiguration conf = vertex.getConf();
+      boolean useNullLocationHint = true;
+      if (vertex.vertexLocationHint != null
+          && vertex.vertexLocationHint.getTaskLocationHints() != null
+          && vertex.vertexLocationHint.getTaskLocationHints().length ==
+              vertex.numTasks) {
+        useNullLocationHint = false;
+      }
+      for (int i=0; i < vertex.numTasks; ++i) {
+        TaskLocationHint locHint = null;
+        if (!useNullLocationHint) {
+          locHint = vertex.vertexLocationHint.getTaskLocationHints()[i];
+        }
+        TaskImpl task =
+            new TaskImpl(vertex.getVertexId(), i,
+                vertex.eventHandler,
+                null,
+                conf,
+                vertex.taskAttemptListener,
+                null,
+                vertex.jobToken,
+                vertex.fsTokens,
+                vertex.clock,
+                vertex.taskHeartbeatHandler,
+                vertex.appContext,
+                vertex.processorName, false,
+                locHint, vertex.taskResource,
+                vertex.localResources,
+                vertex.environment);
+        vertex.addTask(task);
+        LOG.info("Created task for vertex " + vertex.getVertexId() + ": " +
+            task.getTaskId());
+      }
+
+    }
+
+    // TODO: Splits
+    /*
+    protected TaskSplitMetaInfo[] createSplits(VertexImpl job, JobId jobId) {
+      TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+      try {
+        allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
+            job.oldJobId, job.fs,
+            job.conf,
+            job.remoteJobSubmitDir);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+      return allTaskSplitMetaInfo;
+    }
+    */
+
+    /**
+     * If the number of tasks are greater than the configured value
+     * throw an exception that will fail job initialization
+     */
+    private void checkTaskLimits() {
+      // no code, for now
+    }
+  } // end of InitTransition
+
+  // Temporary to maintain topological order while starting vertices. Not useful
+  // since there's not much difference between the INIT and RUNNING states.
+  public static class SourceVertexStartedTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventSourceVertexStarted startEvent = 
+                                      (VertexEventSourceVertexStarted) event;
+      int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
+      if(vertex.distanceFromRoot < distanceFromRoot) {
+        vertex.distanceFromRoot = distanceFromRoot;
+      }
+      vertex.numStartedSourceVertices++;
+      if (vertex.numStartedSourceVertices == vertex.sourceVertices.size()) {
+        // Consider inlining this.
+        LOG.info("Starting vertex: " + vertex.getVertexId() + 
+                 " with name: " + vertex.getName() + 
+                 " with distanceFromRoot: " + vertex.distanceFromRoot );
+        vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+            VertexEventType.V_START));
+      }
+    }
+  }
+
+  public static class StartTransition
+  implements SingleArcTransition<VertexImpl, VertexEvent> {
+    /**
+     * This transition executes in the event-dispatcher thread, though it's
+     * triggered in MRAppMaster's startJobs() method.
+     */
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      vertex.startTime = vertex.clock.getTime();
+      vertex.vertexScheduler.onVertexStarted();
+      vertex.logJobHistoryVertexStartedEvent();
+
+      // TODO: Metrics
+      //job.metrics.runningJob(job);
+
+      // default behavior is to start immediately. so send information about us
+      // starting to downstream vertices. If the connections/structure of this
+      // vertex is not fully defined yet then we could send this event later
+      // when we are ready
+      for (Vertex targetVertex : vertex.targetVertices.keySet()) {
+        vertex.eventHandler.handle(
+            new VertexEventSourceVertexStarted(targetVertex.getVertexId(),
+                                               vertex.distanceFromRoot));
+      }
+
+      // If we have no tasks, just transition to vertex completed
+      if (vertex.numTasks == 0) {
+        vertex.eventHandler.handle(
+            new VertexEvent(vertex.vertexId, VertexEventType.V_COMPLETED));
+      }
+    }
+  }
+
+  private void abortVertex(VertexStatus.State finalState) {
+    //TODO: Committer?    /*
+    try {
+      committer.abortVertex(finalState);
+    } catch (IOException e) {
+      LOG.warn("Could not abort vertex, name=" + getName(), e);
+    }
+
+    if (finishTime == 0) {
+      setFinishTime();
+    }
+    logJobHistoryVertexAbortedEvent();
+  }
+
+  private void mayBeConstructFinalFullCounters() {
+    // Calculating full-counters. This should happen only once for the vertex.
+    synchronized (this.fullCountersLock) {
+      if (this.fullCounters != null) {
+        // Already constructed. Just return.
+        return;
+      }
+      this.constructFinalFullcounters();
+    }
+  }
+
+  @Private
+  public void constructFinalFullcounters() {
+    this.fullCounters = new TezCounters();
+    for (Task t : this.tasks.values()) {
+      TezCounters counters = t.getCounters();
+      this.fullCounters.incrAllCounters(counters);
+    }
+  }
+
+  // Task-start has been moved out of InitTransition, so this arc simply
+  // hardcodes 0 for both map and reduce finished tasks.
+  private static class KillNewVertexTransition
+  implements SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      vertex.setFinishTime();
+      vertex.logJobHistoryUnsuccessfulVertexCompletion();
+      vertex.finished(VertexState.KILLED);
+    }
+  }
+
+  private static class KillInitedVertexTransition
+  implements SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      vertex.abortVertex(VertexStatus.State.KILLED);
+      vertex.addDiagnostic("Job received Kill in INITED state.");
+      vertex.finished(VertexState.KILLED);
+    }
+  }
+
+  private static class KillTasksTransition
+      implements SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      vertex.addDiagnostic("Job received Kill while in RUNNING state.");
+      for (Task task : vertex.tasks.values()) {
+        vertex.eventHandler.handle(
+            new TaskEvent(task.getTaskId(), TaskEventType.T_KILL));
+      }
+      // TODO: Metrics
+      //job.metrics.endRunningJob(job);
+    }
+  }
+
+  /**
+   * Here, the Vertex is being told that one of his source task-attempts
+   * completed.
+   */
+  private static class SourceTaskAttemptCompletedEventTransition implements
+  SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      TezDependentTaskCompletionEvent tce =
+          ((VertexEventSourceTaskAttemptCompleted) event).getCompletionEvent();
+      // Add the TaskAttemptCompletionEvent
+      //eventId is equal to index in the arraylist
+      tce.setEventId(vertex.sourceTaskAttemptCompletionEvents.size());
+      vertex.sourceTaskAttemptCompletionEvents.add(tce);
+      // FIXME this needs to be ordered/grouped by source vertices or else 
+      // my tasks will not know which events are for which vertices' tasks. This 
+      // differentiation was not needed for MR because there was only 1 M stage.
+      // if the tce is sent to the task then a solution could be to add vertex 
+      // name to the tce
+      // need to send vertex name and task index in that vertex
+
+      TezTaskAttemptID attemptId = tce.getTaskAttemptID();
+      TezTaskID taskId = attemptId.getTaskID();
+      //make the previous completion event as obsolete if it exists
+      if (TezDependentTaskCompletionEvent.Status.SUCCEEDED.equals(tce.getStatus())) {
+        Object successEventNo =
+            vertex.successSourceAttemptCompletionEventNoMap.remove(taskId);
+        if (successEventNo != null) {
+          TezDependentTaskCompletionEvent successEvent =
+              vertex.sourceTaskAttemptCompletionEvents.get((Integer) successEventNo);
+          successEvent.setTaskStatus(TezDependentTaskCompletionEvent.Status.OBSOLETE);
+        }
+        vertex.successSourceAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
+      }
+      
+      vertex.vertexScheduler.onSourceTaskCompleted(attemptId);
+    }
+  }
+
+  private static class TaskAttemptCompletedEventTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      TezDependentTaskCompletionEvent tce =
+        ((VertexEventTaskAttemptCompleted) event).getCompletionEvent();
+
+      // FIXME this should only be sent for successful events? looks like all 
+      // need to be sent in the existing shuffle code
+      // Notify all target vertices
+      if (vertex.targetVertices != null) {
+        for (Vertex targetVertex : vertex.targetVertices.keySet()) {
+          vertex.eventHandler.handle(
+              new VertexEventSourceTaskAttemptCompleted(
+                  targetVertex.getVertexId(), tce)
+              );
+        }
+      }
+    }
+  }
+
+  private static class TaskAttemptFetchFailureTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventTaskAttemptFetchFailure fetchfailureEvent =
+          (VertexEventTaskAttemptFetchFailure) event;
+      for (TezTaskAttemptID mapId : fetchfailureEvent.getSources()) {
+        Integer fetchFailures = vertex.fetchFailuresMapping.get(mapId);
+        fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
+        vertex.fetchFailuresMapping.put(mapId, fetchFailures);
+
+        //get number of running reduces
+        int runningReduceTasks = 0;
+        for (TezTaskID taskId : vertex.tasks.keySet()) {
+          if (TaskState.RUNNING.equals(vertex.tasks.get(taskId).getState())) {
+            runningReduceTasks++;
+          }
+        }
+
+        float failureRate = runningReduceTasks == 0 ? 1.0f :
+          (float) fetchFailures / runningReduceTasks;
+        // declare faulty if fetch-failures >= max-allowed-failures
+        boolean isMapFaulty =
+            (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
+          LOG.info("Too many fetch-failures for output of task attempt: " +
+              mapId + " ... raising fetch failure to source");
+          vertex.eventHandler.handle(new TaskAttemptEvent(mapId,
+              TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+          vertex.fetchFailuresMapping.remove(mapId);
+        }
+      }
+    }
+  }
+
+  private static class TaskCompletedTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      vertex.completedTaskCount++;//FIXME this is a bug
+      LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
+      VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
+      Task task = vertex.tasks.get(taskEvent.getTaskID());
+      if (taskEvent.getState() == TaskState.SUCCEEDED) {
+        taskSucceeded(vertex, task);
+      } else if (taskEvent.getState() == TaskState.FAILED) {
+        taskFailed(vertex, task);
+      } else if (taskEvent.getState() == TaskState.KILLED) {
+        taskKilled(vertex, task);
+      }
+
+      vertex.vertexScheduler.onVertexCompleted();
+      VertexState state = checkVertexForCompletion(vertex);
+      if(state == VertexState.SUCCEEDED) {
+        vertex.vertexScheduler.onVertexCompleted();
+      }
+      return state;
+    }
+
+    protected VertexState checkVertexForCompletion(VertexImpl vertex) {
+      //check for vertex failure
+      if (vertex.failedTaskCount > 1) {
+        vertex.setFinishTime();
+
+        String diagnosticMsg = "Vertex failed as tasks failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.addDiagnostic(diagnosticMsg);
+        vertex.abortVertex(VertexStatus.State.FAILED);
+        vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+            .getVertexId(), VertexState.FAILED));
+        return vertex.finished(VertexState.FAILED);
+      }
+
+      VertexState vertexCompleteSuccess =
+          VertexImpl.checkVertexCompleteSuccess(vertex);
+      if (vertexCompleteSuccess != null) {
+        return vertexCompleteSuccess;
+      }
+
+      //return the current state, Vertex not finished yet
+      return vertex.getInternalState();
+    }
+
+    private void taskSucceeded(VertexImpl vertex, Task task) {
+      vertex.succeededTaskCount++;
+      // TODO Metrics
+      // job.metrics.completedTask(task);
+    }
+
+    private void taskFailed(VertexImpl vertex, Task task) {
+      vertex.failedTaskCount++;
+      vertex.addDiagnostic("Task failed " + task.getTaskId());
+      // TODO Metrics
+      //vertex.metrics.failedTask(task);
+    }
+
+    private void taskKilled(VertexImpl vertex, Task task) {
+      vertex.killedTaskCount++;
+      // TODO Metrics
+      //job.metrics.killedTask(task);
+    }
+  }
+
+  // Transition class for handling jobs with no tasks
+  // TODODAGAM - is this allowed for a vertex?
+  static class VertexNoTasksCompletedTransition implements
+  MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexState vertexCompleteSuccess =
+          VertexImpl.checkVertexCompleteSuccess(vertex);
+      if (vertexCompleteSuccess != null) {
+        return vertexCompleteSuccess;
+      }
+
+      // Return the current state, Job not finished yet
+      return vertex.getInternalState();
+    }
+  }
+
+  private static class TaskRescheduledTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      //succeeded map task is restarted back
+      vertex.completedTaskCount--;
+      vertex.succeededTaskCount--;
+    }
+  }
+
+  private static class KillWaitTaskCompletedTransition extends
+      TaskCompletedTransition {
+    @Override
+    protected VertexState checkVertexForCompletion(VertexImpl vertex) {
+      if (vertex.completedTaskCount == vertex.tasks.size()) {
+        vertex.setFinishTime();
+        vertex.abortVertex(VertexStatus.State.KILLED);
+        return vertex.finished(VertexState.KILLED);
+      }
+      //return the current state, Job not finished yet
+      return vertex.getInternalState();
+    }
+  }
+
+  private void addDiagnostic(String diag) {
+    diagnostics.add(diag);
+  }
+
+  private static class InternalErrorTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      //TODO Is this JH event required.
+      vertex.setFinishTime();
+      vertex.logJobHistoryUnsuccessfulVertexCompletion();
+      vertex.finished(VertexState.ERROR);
+    }
+  }
+
+  @Override
+  public void setInputVertices(Map<Vertex, EdgeProperty> inVertices) {
+    this.sourceVertices = inVertices;
+  }
+
+  @Override
+  public void setOutputVertices(Map<Vertex, EdgeProperty> outVertices) {
+    this.targetVertices = outVertices;
+  }
+
+  @Override
+  public int compareTo(Vertex other) {
+    return this.vertexId.compareTo(other.getVertexId());
+  }
+
+  @Override
+  public Map<Vertex, EdgeProperty> getInputVertices() {
+    return Collections.unmodifiableMap(this.sourceVertices);
+  }
+
+  @Override
+  public Map<Vertex, EdgeProperty> getOutputVertices() {
+    return Collections.unmodifiableMap(this.targetVertices);
+  }
+
+  @Override
+  public int getInputVerticesCount() {
+    return this.sourceVertices.size();
+  }
+
+  @Override
+  public int getOutputVerticesCount() {
+    return this.targetVertices.size();
+  }
+
+  @Override
+  public TezDAGID getDAGId() {
+    return appContext.getDAGID();
+  }
+
+  public Resource getTaskResource() {
+    return taskResource;
+  }
+
+  @Override
+  public DAG getDAG() {
+    return appContext.getDAG();
+  }
+
+  // TODO Eventually remove synchronization.
+  @Override
+  public synchronized List<InputSpec> getInputSpecList() {
+    inputSpecList = new ArrayList<InputSpec>(
+        this.getInputVerticesCount());
+    for (Vertex srcVertex : this.getInputVertices().keySet()) {
+      InputSpec inputSpec = new InputSpec(srcVertex.getName(),
+          srcVertex.getTotalTasks());
+      LOG.info("DEBUG: For vertex : " + this.getName()
+          + ", Using InputSpec : " + inputSpec);
+      // TODO DAGAM This should be based on the edge type.
+      inputSpecList.add(inputSpec);
+    }
+    return inputSpecList;
+  }
+
+  // TODO Eventually remove synchronization.
+  @Override
+  public synchronized List<OutputSpec> getOutputSpecList() {
+    if (this.outputSpecList == null) {
+      outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount());
+      for (Vertex targetVertex : this.getOutputVertices().keySet()) {
+        OutputSpec outputSpec = new OutputSpec(targetVertex.getName(),
+            targetVertex.getTotalTasks());
+        LOG.info("DEBUG: For vertex : " + this.getName()
+            + ", Using OutputSpec : " + outputSpec);
+        // TODO DAGAM This should be based on the edge type.
+        outputSpecList.add(outputSpec);
+      }
+    }
+    return outputSpecList;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.dag.impl;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.dag;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.launcher;
+
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+
+public interface ContainerLauncher 
+    extends EventHandler<NMCommunicatorEvent> {
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,424 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.launcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+// TODO XXX: See what part of this lifecycle and state management can be simplified.
+// Ideally, no state - only sendStart / sendStop.
+
+// TODO XXX: Review this entire code and clean it up.
+
+/**
+ * This class is responsible for launching of containers.
+ */
+public class ContainerLauncherImpl extends AbstractService implements
+    ContainerLauncher {
+
+  // TODO XXX Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
+  static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+  private ConcurrentHashMap<ContainerId, Container> containers = 
+    new ConcurrentHashMap<ContainerId, Container>(); 
+  private AppContext context;
+  protected ThreadPoolExecutor launcherPool;
+  protected static final int INITIAL_POOL_SIZE = 10;
+  private int limitOnPoolSize;
+  private Thread eventHandlingThread;
+  protected BlockingQueue<NMCommunicatorEvent> eventQueue =
+      new LinkedBlockingQueue<NMCommunicatorEvent>();
+  YarnRPC rpc;
+
+  private Container getContainer(NMCommunicatorEvent event) {
+    ContainerId id = event.getContainerId();
+    Container c = containers.get(id);
+    if(c == null) {
+      c = new Container(event.getContainerId(),
+          event.getNodeId().toString(), event.getContainerToken());
+      Container old = containers.putIfAbsent(id, c);
+      if(old != null) {
+        c = old;
+      }
+    }
+    return c;
+  }
+  
+  private void removeContainerIfDone(ContainerId id) {
+    Container c = containers.get(id);
+    if(c != null && c.isCompletelyDone()) {
+      containers.remove(id);
+    }
+  }
+  
+  private static enum ContainerState {
+    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
+  }
+
+  private class Container {
+    private ContainerState state;
+    // store enough information to be able to cleanup the container
+    private ContainerId containerID;
+    final private String containerMgrAddress;
+    private ContainerToken containerToken;
+    
+    public Container(ContainerId containerID,
+        String containerMgrAddress, ContainerToken containerToken) {
+      this.state = ContainerState.PREP;
+      this.containerMgrAddress = containerMgrAddress;
+      this.containerID = containerID;
+      this.containerToken = containerToken;
+    }
+    
+    public synchronized boolean isCompletelyDone() {
+      return state == ContainerState.DONE || state == ContainerState.FAILED;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public synchronized void launch(NMCommunicatorLaunchRequestEvent event) {
+      LOG.info("Launching Container with Id: " + event.getContainerId());
+      if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+        state = ContainerState.DONE;
+        sendContainerLaunchFailedMsg(event.getContainerId(), 
+            "Container was killed before it was launched");
+        return;
+      }
+      
+      ContainerManager proxy = null;
+      try {
+
+        proxy = getCMProxy(containerID, containerMgrAddress,
+            containerToken);
+
+        // Construct the actual Container
+        ContainerLaunchContext containerLaunchContext =
+          event.getContainerLaunchContext();
+
+        // Now launch the actual container
+        StartContainerRequest startRequest = Records
+          .newRecord(StartContainerRequest.class);
+        startRequest.setContainer(event.getContainer());
+        startRequest.setContainerLaunchContext(containerLaunchContext);
+        StartContainerResponse response = proxy.startContainer(startRequest);
+
+        ByteBuffer portInfo = response
+          .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
+        int port = -1;
+        if(portInfo != null) {
+          port = ShuffleHandler.deserializeMetaData(portInfo);
+        }
+        LOG.info("Shuffle port returned by ContainerManager for "
+            + containerID + " : " + port);
+
+        if(port < 0) {
+          this.state = ContainerState.FAILED;
+          throw new IllegalStateException("Invalid shuffle port number "
+              + port + " returned for " + containerID);
+        }
+
+        // after launching, send launched event to task attempt to move
+        // it from ASSIGNED to RUNNING state
+        context.getEventHandler().handle(new AMContainerEventLaunched(containerID, port));
+        this.state = ContainerState.RUNNING;
+      } catch (Throwable t) {
+        String message = "Container launch failed for " + containerID + " : "
+            + StringUtils.stringifyException(t);
+        this.state = ContainerState.FAILED;
+        sendContainerLaunchFailedMsg(containerID, message);
+      } finally {
+        if (proxy != null) {
+          ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+        }
+      }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public synchronized void kill() {
+
+      if(isCompletelyDone()) { 
+        return;
+      }
+      if(this.state == ContainerState.PREP) {
+        this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+      } else {
+        LOG.info("Sending a stop request to the NM for ContainerId: "
+            + containerID);
+
+        ContainerManager proxy = null;
+        try {
+          proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+              this.containerToken);
+
+            // kill the remote container if already launched
+            StopContainerRequest stopRequest = Records
+              .newRecord(StopContainerRequest.class);
+            stopRequest.setContainerId(this.containerID);
+            proxy.stopContainer(stopRequest);
+            // If stopContainer returns without an error, assuming the stop made
+            // it over to the NodeManager.
+          context.getEventHandler().handle(
+              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+        } catch (Throwable t) {
+
+          // ignore the cleanup failure
+          String message = "cleanup failed for container "
+            + this.containerID + " : "
+            + StringUtils.stringifyException(t);
+          context.getEventHandler().handle(
+              new AMContainerEventStopFailed(containerID, message));
+          LOG.warn(message);
+          this.state = ContainerState.DONE;
+          return;
+        } finally {
+          if (proxy != null) {
+            ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+          }
+        }
+        this.state = ContainerState.DONE;
+      }
+    }
+  }
+
+  public ContainerLauncherImpl(AppContext context) {
+    super(ContainerLauncherImpl.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  public synchronized void init(Configuration config) {
+    Configuration conf = new Configuration(config);
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+    this.limitOnPoolSize = conf.getInt(
+        MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
+        MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
+    LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
+    this.rpc = createYarnRPC(conf);
+    super.init(conf);
+  }
+  
+  protected YarnRPC createYarnRPC(Configuration conf) {
+    return YarnRPC.create(conf);
+  }
+
+  public void start() {
+
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        "ContainerLauncher #%d").setDaemon(true).build();
+
+    // Start with a default core-pool size of 10 and change it dynamically.
+    launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
+        Integer.MAX_VALUE, 1, TimeUnit.HOURS,
+        new LinkedBlockingQueue<Runnable>(),
+        tf);
+    eventHandlingThread = new Thread() {
+      @Override
+      public void run() {
+        NMCommunicatorEvent event = null;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+          int poolSize = launcherPool.getCorePoolSize();
+
+          // See if we need up the pool size only if haven't reached the
+          // maximum limit yet.
+          if (poolSize != limitOnPoolSize) {
+
+            // nodes where containers will run at *this* point of time. This is
+            // *not* the cluster size and doesn't need to be.
+            int numNodes = context.getAllNodes().size();
+            int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
+
+            if (poolSize < idealPoolSize) {
+              // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
+              // later is just a buffer so we are not always increasing the
+              // pool-size
+              int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+                  + INITIAL_POOL_SIZE);
+              LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+                  + " as number-of-nodes to talk to is " + numNodes);
+              launcherPool.setCorePoolSize(newPoolSize);
+            }
+          }
+
+          // the events from the queue are handled in parallel
+          // using a thread pool
+          launcherPool.execute(createEventProcessor(event));
+
+          // TODO: Group launching of multiple containers to a single
+          // NodeManager into a single connection
+        }
+      }
+    };
+    eventHandlingThread.setName("ContainerLauncher Event Handler");
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  private void shutdownAllContainers() {
+    for (Container ct : this.containers.values()) {
+      if (ct != null) {
+        ct.kill();
+      }
+    }
+  }
+
+  public void stop() {
+    // shutdown any containers that might be left running
+    shutdownAllContainers();
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    if (launcherPool != null) {
+      launcherPool.shutdownNow();
+    }
+    super.stop();
+  }
+
+  protected EventProcessor createEventProcessor(NMCommunicatorEvent event) {
+    return new EventProcessor(event);
+  }
+
+  protected ContainerManager getCMProxy(ContainerId containerID,
+      final String containerManagerBindAddr, ContainerToken containerToken)
+      throws IOException {
+
+    final InetSocketAddress cmAddr =
+        NetUtils.createSocketAddr(containerManagerBindAddr);
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Token<ContainerTokenIdentifier> token =
+          ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
+      // the user in createRemoteUser in this context has to be ContainerID
+      user = UserGroupInformation.createRemoteUser(containerID.toString());
+      user.addToken(token);
+    }
+
+    ContainerManager proxy = user
+        .doAs(new PrivilegedAction<ContainerManager>() {
+          @Override
+          public ContainerManager run() {
+            return (ContainerManager) rpc.getProxy(ContainerManager.class,
+                cmAddr, getConfig());
+          }
+        });
+    return proxy;
+  }
+
+
+  /**
+   * Setup and start the container on remote nodemanager.
+   */
+  class EventProcessor implements Runnable {
+    private NMCommunicatorEvent event;
+
+    EventProcessor(NMCommunicatorEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Processing the event " + event.toString());
+
+      // Load ContainerManager tokens before creating a connection.
+      // TODO: Do it only once per NodeManager.
+      ContainerId containerID = event.getContainerId();
+
+      Container c = getContainer(event);
+      switch(event.getType()) {
+
+      case CONTAINER_LAUNCH_REQUEST:
+        NMCommunicatorLaunchRequestEvent launchEvent
+            = (NMCommunicatorLaunchRequestEvent) event;
+        c.launch(launchEvent);
+        break;
+
+      case CONTAINER_STOP_REQUEST:
+        c.kill();
+        break;
+      }
+      removeContainerIfDone(containerID);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(ContainerId containerId,
+      String message) {
+    LOG.error(message);
+    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
+  }
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.launcher;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/LocalContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/LocalContainerRequestor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/LocalContainerRequestor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/LocalContainerRequestor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.local;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.client.ClientService;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.rm.ContainerRequestor;
+import org.apache.tez.dag.app.rm.RMCommunicator;
+import org.apache.tez.dag.app.rm.RMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * For UberAM, the LocalContainerRequestor is responsible for sending keep-alive
+ * heartbeats to the RM, along with sending over job progress. Also provides any
+ * additional information to the rest of the AM - ApplicationACLs etc.
+ */
+public class LocalContainerRequestor extends RMCommunicator implements
+    ContainerRequestor {
+
+  private static final Log LOG =
+      LogFactory.getLog(LocalContainerRequestor.class);
+  
+  private long retrystartTime;
+  private long retryInterval;
+  
+  public LocalContainerRequestor(ClientService clientService, AppContext context) {
+    super(clientService, context);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    retryInterval =
+        getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void heartbeat() throws Exception {
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        this.applicationAttemptId, this.lastResponseID, super
+            .getApplicationProgress(), new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>());
+    AllocateResponse response;
+    try {
+      response = scheduler.allocate(allocateRequest);
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
+        eventHandler.handle(new DAGEvent(this.getJob().getID(),
+                                         DAGEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
+    if (response.getReboot()) {
+      LOG.info("Event from RM: shutting down Application Master");
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new DAGEvent(this.getJob().getID(),
+                                       DAGEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
+    }
+  }
+
+  @Override
+  public void handle(RMCommunicatorEvent rawEvent) {
+    switch (rawEvent.getType()) {
+    case CONTAINER_DEALLOCATE:
+      LOG.warn("Unexpected eventType: " + rawEvent.getType() + ", Event: "
+          + rawEvent);
+      break;
+    default:
+      break;
+    }
+  }
+
+  @Override
+  public Resource getAvailableResources() {
+    throw new YarnException("Unexpected call to getAvailableResource");
+  }
+
+  @Override
+  public void addContainerReq(ContainerRequest req) {
+    throw new YarnException("Unexpected call to addContainerReq");
+  }
+
+  @Override
+  public void decContainerReq(ContainerRequest req) {
+    throw new YarnException("Unexpected call to decContainerReq");
+  }
+
+  @Override
+  public Map<ApplicationAccessType, String> getApplicationACLs() {
+    return getApplicationAcls();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/LocalContainerRequestor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.local;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMRMClient.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,159 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AMRMClient <T> extends Service {
+
+  /**
+   * Object to represent container request for resources.
+   * Resources may be localized to nodes and racks.
+   * Resources may be assigned priorities.
+   * Can ask for multiple containers of a given type.
+   */
+  public static class ContainerRequest<T> {
+    // TODO define equals()
+    Resource capability;
+    String[] hosts;
+    String[] racks;
+    Priority priority;
+    int containerCount;
+    T cookie;
+        
+    public ContainerRequest(Resource capability, String[] hosts,
+        String[] racks, Priority priority, int containerCount) {
+      this.capability = capability;
+      this.hosts = (hosts != null ? hosts.clone() : null);
+      this.racks = (racks != null ? racks.clone() : null);
+      this.priority = priority;
+      this.containerCount = containerCount;
+    }
+    
+    void setCookie(T cookie) {
+      this.cookie = cookie;
+    }
+    
+    T getCookie() {
+      return cookie;
+    }
+    
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      sb.append("ContainerCount[").append(containerCount).append("]");
+      return sb.toString();
+    }
+  }
+  
+  /**
+   * Register the application master. This must be called before any 
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws YarnRemoteException
+   */
+  public RegisterApplicationMasterResponse 
+               registerApplicationMaster(String appHostName,
+                                         int appHostPort,
+                                         String appTrackingUrl) 
+               throws YarnRemoteException;
+  
+  /**
+   * Request additional containers and receive new container allocations.
+   * Requests made via <code>addContainerRequest</code> are sent to the 
+   * <code>ResourceManager</code>. New containers assigned to the master are 
+   * retrieved. Status of completed containers and node health updates are 
+   * also retrieved.
+   * This also doubles up as a heartbeat to the ResourceManager and must be 
+   * made periodically.
+   * The call may not always return any new allocations of containers.
+   * App should not make concurrent allocate requests. May cause request loss.
+   * @param progressIndicator Indicates progress made by the master
+   * @return the response of the allocate request
+   * @throws YarnRemoteException
+   */
+  public AllocateResponse allocate(float progressIndicator) 
+                           throws YarnRemoteException;
+  
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnRemoteException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                           String appMessage,
+                                           String appTrackingUrl) 
+               throws YarnRemoteException;
+  
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public void addContainerRequest(ContainerRequest<T> req);
+  
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(ContainerRequest<T> req);
+  
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId);
+  
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources();
+  
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount();
+}