You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [10/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.rm.container.AMContainerImpl;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+
+@SuppressWarnings("unchecked")
+public class TaskAttemptListenerImpTezDag extends AbstractService implements
+    TezTaskUmbilicalProtocol, TaskAttemptListener {
+
+  private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
+      null, true);
+  
+  private static ProceedToCompletionResponse COMPLETION_RESPONSE_NO_WAIT =
+      new ProceedToCompletionResponse(true, true);
+
+  private static final Log LOG = LogFactory
+      .getLog(TaskAttemptListenerImpTezDag.class);
+
+  private final AppContext context;
+
+  protected final TaskHeartbeatHandler taskHeartbeatHandler;
+  protected final ContainerHeartbeatHandler containerHeartbeatHandler;
+  private final JobTokenSecretManager jobTokenSecretManager;
+  private InetSocketAddress address;
+  private Server server;
+
+
+  // TODO Use this to figure out whether an incoming ping is valid.
+  private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToContainerIdMap =
+      new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
+  
+  private Set<ContainerId> registeredContainers = Collections
+      .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+  
+  public TaskAttemptListenerImpTezDag(AppContext context,
+      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
+      JobTokenSecretManager jobTokenSecretManager) {
+    super(TaskAttemptListenerImpTezDag.class.getName());
+    this.context = context;
+    this.jobTokenSecretManager = jobTokenSecretManager;
+    this.taskHeartbeatHandler = thh;
+    this.containerHeartbeatHandler = chh;
+  }
+
+  @Override
+  public void start() {
+    startRpcServer();
+    super.start();
+  }
+
+  protected void startRpcServer() {
+    Configuration conf = getConfig();
+    try {
+      server = new RPC.Builder(conf)
+          .setProtocol(TezTaskUmbilicalProtocol.class)
+          .setBindAddress("0.0.0.0")
+          .setPort(0)
+          .setInstance(this)
+          .setNumHandlers(
+              conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+                  MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT))
+          .setSecretManager(jobTokenSecretManager).build();
+      
+      // Enable service authorization?
+      if (conf.getBoolean(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+          false)) {
+        refreshServiceAcls(conf, new MRAMPolicyProvider());
+      }
+
+      server.start();
+      this.address = NetUtils.getConnectAddress(server);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  void refreshServiceAcls(Configuration configuration, 
+      PolicyProvider policyProvider) {
+    this.server.refreshServiceAcl(configuration, policyProvider);
+  }
+
+  @Override
+  public void stop() {
+    stopRpcServer();
+    super.stop();
+  }
+
+  protected void stopRpcServer() {
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(this, protocol,
+        clientVersion, clientMethodsHash);
+  }
+
+  @Override
+  public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+      int fromEventIdx, int maxEvents,
+      TezTaskAttemptID taskAttemptID) {
+    
+    LOG.info("Dependency Completion Events request from " + taskAttemptID
+        + ". fromEventID " + fromEventIdx + " maxEvents " + maxEvents);
+
+    // TODO: shouldReset is never used. See TT. Ask for Removal.
+    boolean shouldReset = false;
+    TezDependentTaskCompletionEvent[] events = 
+        context.getDAG().
+            getVertex(taskAttemptID.getTaskID().getVertexID()).
+                getTaskAttemptCompletionEvents(fromEventIdx, maxEvents);
+
+    taskHeartbeatHandler.progressing(taskAttemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+
+    // No filters for now. Only required events stored in a vertex.
+
+    return new TezTaskDependencyCompletionEventsUpdate(events,shouldReset);
+  }
+
+  @Override
+  public ContainerTask getTask(ContainerContext containerContext)
+      throws IOException {
+
+    ContainerTask task = null;
+
+    if (containerContext == null || containerContext.getContainerId() == null) {
+      LOG.info("Invalid task request with an empty containerContext or containerId");
+      task = TASK_FOR_INVALID_JVM;
+    } else {
+      ContainerId containerId = containerContext.getContainerId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Container with id: " + containerId + " asked for a task");
+      }
+      if (!registeredContainers.contains(containerId)) {
+        LOG.info("Container with id: " + containerId
+            + " is invalid and will be killed");
+        task = TASK_FOR_INVALID_JVM;
+      } else {
+        pingContainerHeartbeatHandler(containerId);
+        AMContainerTask taskContext = pullTaskAttemptContext(containerId);
+        if (taskContext.shouldDie()) {
+          LOG.info("No more tasks for container with id : " + containerId
+              + ". Asking it to die");
+          task = TASK_FOR_INVALID_JVM; // i.e. ask the child to die.
+        } else {
+          if (taskContext.getTask() == null) {
+            LOG.info("No task currently assigned to Container with id: "
+                + containerId);
+          } else {
+            task = new ContainerTask(taskContext.getTask(), false);
+            context.getEventHandler().handle(
+                new TaskAttemptEventStartedRemotely(taskContext.getTask()
+                    .getTaskAttemptId(), containerId, context
+                    .getApplicationACLs(), context.getAllContainers()
+                    .get(containerId).getShufflePort()));
+            LOG.info("Container with id: " + containerId + " given task: "
+                + taskContext.getTask().getTaskAttemptId());
+          }
+        }
+      }
+    }
+    LOG.info("DEBUG: getTask returning task: " + task);
+    return task;
+  }
+
+  @Override
+  public boolean statusUpdate(TezTaskAttemptID taskAttemptId,
+      TezTaskStatus taskStatus) throws IOException, InterruptedException {
+    LOG.info("DEBUG: " + "Status update from: " + taskAttemptId);
+    taskHeartbeatHandler.progressing(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+    TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+    taskAttemptStatus.id = taskAttemptId;
+    // Task sends the updated progress to the TT.
+    taskAttemptStatus.progress = taskStatus.getProgress();
+    LOG.info("DEBUG: " + "Progress of TaskAttempt " + taskAttemptId + " is : "
+        + taskStatus.getProgress());
+
+    // Task sends the updated state-string to the TT.
+    taskAttemptStatus.stateString = taskStatus.getStateString();
+
+    // Set the output-size when map-task finishes. Set by the task itself.
+    // outputSize is never used.
+    taskAttemptStatus.outputSize = taskStatus.getLocalOutputSize();
+
+    // TODO Phase
+    // Task sends the updated phase to the TT.
+    //taskAttemptStatus.phase = MRxTypeConverters.toYarn(taskStatus.getPhase());
+
+    // TODO MRXAM3 - AVoid the 10 layers of convresion.
+    // Counters are updated by the task. Convert counters into new format as
+    // that is the primary storage format inside the AM to avoid multiple
+    // conversions and unnecessary heap usage.
+    taskAttemptStatus.counters = taskStatus.getCounters();
+    
+
+    // Map Finish time set by the task (map only)
+    // TODO CLEANMRXAM - maybe differentiate between map / reduce / types
+    if (taskStatus.getMapFinishTime() != 0) {
+      taskAttemptStatus.mapFinishTime = taskStatus.getMapFinishTime();
+    }
+
+    // Shuffle Finish time set by the task (reduce only).
+    if (taskStatus.getShuffleFinishTime() != 0) {
+      taskAttemptStatus.shuffleFinishTime = taskStatus.getShuffleFinishTime();
+    }
+
+    // Sort finish time set by the task (reduce only).
+    if (taskStatus.getSortFinishTime() != 0) {
+      taskAttemptStatus.sortFinishTime = taskStatus.getSortFinishTime();
+    }
+
+    // Not Setting the task state. Used by speculation - will be set in
+    // TaskAttemptImpl
+    // taskAttemptStatus.taskState =
+    // TypeConverter.toYarn(taskStatus.getRunState());
+
+    // set the fetch failures
+    if (taskStatus.getFailedDependencies() != null
+        && taskStatus.getFailedDependencies().size() > 0) {
+      LOG.warn("Failed dependencies are not handled at the moment." +
+      		" The job is likely to fail / hang");
+      taskAttemptStatus.fetchFailedMaps = new ArrayList<TezTaskAttemptID>();
+      for (TezTaskAttemptID failedAttemptId : taskStatus
+          .getFailedDependencies()) {
+        taskAttemptStatus.fetchFailedMaps.add(failedAttemptId);
+      }
+    }
+
+    // Task sends the information about the nextRecordRange to the TT
+
+    // TODO: The following are not needed here, but needed to be set somewhere
+    // inside AppMaster.
+    // taskStatus.getRunState(); // Set by the TT/JT. Transform into a state
+    // TODO
+    // taskStatus.getStartTime(); // Used to be set by the TaskTracker. This
+    // should be set by getTask().
+    // taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set
+    // when task finishes
+    // // This was used by TT to do counter updates only once every minute. So
+    // this
+    // // isn't ever changed by the Task itself.
+    // taskStatus.getIncludeCounters();
+
+    context.getEventHandler().handle(
+        new TaskAttemptEventStatusUpdate(taskAttemptStatus.id,
+            taskAttemptStatus));
+    return true;
+  }
+
+  @Override
+  public void reportDiagnosticInfo(TezTaskAttemptID taskAttemptId, String trace)
+      throws IOException {
+    LOG.info("Diagnostics report from " + taskAttemptId.toString() + ": "
+        + trace);
+
+    taskHeartbeatHandler.progressing(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    // This is mainly used for cases where we want to propagate exception traces
+    // of tasks that fail.
+
+    // This call exists as a hadoop mapreduce legacy wherein all changes in
+    // counters/progress/phase/output-size are reported through statusUpdate()
+    // call but not diagnosticInformation.
+    context.getEventHandler().handle(
+        new TaskAttemptEventDiagnosticsUpdate(taskAttemptId, trace));
+
+  }
+
+  @Override
+  public boolean ping(TezTaskAttemptID taskAttemptId) throws IOException {
+    LOG.info("Ping from " + taskAttemptId.toString());
+    taskHeartbeatHandler.pinged(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+    return true;
+  }
+
+  @Override
+  public void done(TezTaskAttemptID taskAttemptId) throws IOException {
+    LOG.info("Done acknowledgement from " + taskAttemptId.toString());
+
+    taskHeartbeatHandler.progressing(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_DONE));
+
+  }
+
+  /**
+   * TaskAttempt is reporting that it is in commit_pending and it is waiting for
+   * the commit Response
+   * 
+   * <br/>
+   * Commit it a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public void commitPending(TezTaskAttemptID taskAttemptId, TezTaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    LOG.info("Commit-pending state update from " + taskAttemptId.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    taskHeartbeatHandler.progressing(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+    //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(
+            taskAttemptId, 
+            TaskAttemptEventType.TA_COMMIT_PENDING)
+        );
+  }
+
+  /**
+   * Child checking whether it can commit.
+   * 
+   * <br/>
+   * Commit is a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+    LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    taskHeartbeatHandler.progressing(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    DAG job = context.getDAG();
+    Task task = 
+        job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+            getTask(taskAttemptId.getTaskID());
+    return task.canCommit(taskAttemptId);
+  }
+
+  @Override
+  public void shuffleError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    // TODO: This isn't really used in any MR code. Ask for removal.
+  }
+
+  @Override
+  public void fsError(TezTaskAttemptID taskAttemptId, String message)
+      throws IOException {
+    // This happens only in Child.
+    LOG.fatal("Task: " + taskAttemptId + " - failed due to FSError: " + message);
+    reportDiagnosticInfo(taskAttemptId, "FSError: " + message);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
+  }
+
+  @Override
+  public void fatalError(TezTaskAttemptID taskAttemptId, String message)
+      throws IOException {
+    // This happens only in Child and in the Task.
+    LOG.fatal("Task: " + taskAttemptId + " - exited : " + message);
+    reportDiagnosticInfo(taskAttemptId, "Error: " + message);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
+  }
+
+  @Override
+  public void outputReady(TezTaskAttemptID taskAttemptId,
+      OutputContext outputContext) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("AttemptId: " + taskAttemptId + " reported output context: "
+          + outputContext);
+    }
+    context.getEventHandler().handle(
+        new TaskAttemptEventOutputConsumable(taskAttemptId, outputContext));
+  }
+
+  @Override
+  public ProceedToCompletionResponse
+      proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException {
+    
+    // The async nature of the processing combined with the 1 second interval
+    // between polls (MRTask) implies tasks end up wasting upto 1 second doing
+    // nothing. Similarly for CA_COMMIT.
+    
+    DAG job = context.getDAG();
+    Task task = 
+        job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+            getTask(taskAttemptId.getTaskID());
+    
+    // TODO In-Memory Shuffle
+    /*
+    if (task.needsWaitAfterOutputConsumable()) {
+      TezTaskAttemptID outputReadyAttempt = task.getOutputConsumableAttempt();
+      if (outputReadyAttempt != null) {
+        if (!outputReadyAttempt.equals(taskAttemptId)) {
+          LOG.info("Telling taksAttemptId: "
+              + taskAttemptId
+              + " to die, since the outputReady atempt for this task is different: "
+              + outputReadyAttempt);
+          return new ProceedToCompletionResponse(true, true);
+        }
+      }
+      boolean reducesDone = true;
+      for (Task rTask : job.getTasks(TaskType.REDUCE).values()) {
+        if (rTask.getState() != TaskState.SUCCEEDED) {
+          // TODO EVENTUALLY - could let the map tasks exit after reduces are
+          // done with the shuffle phase, instead of waiting for the reduces to
+          // complete.
+          reducesDone = false;
+          break;
+        }
+      }
+      if (reducesDone) {
+        return new ProceedToCompletionResponse(false, true);
+      } else {
+        return new ProceedToCompletionResponse(false, false);
+      }
+    } else {
+      return COMPLETION_RESPONSE_NO_WAIT;
+    }
+    */
+    return COMPLETION_RESPONSE_NO_WAIT;
+  }
+  
+  
+  
+  // TODO EVENTUALLY remove all mrv2 ids.
+  @Override
+  public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+    attemptToContainerIdMap.remove(attemptId);
+  }
+
+  public AMContainerTask pullTaskAttemptContext(ContainerId containerId) {
+    AMContainerImpl container = (AMContainerImpl) context.getAllContainers()
+        .get(containerId);
+    return container.pullTaskContext();
+  }
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ContainerId: " + containerId
+          + " registered with TaskAttemptListener");
+    }
+    registeredContainers.add(containerId);
+  }
+
+  @Override
+  public void registerTaskAttempt(TezTaskAttemptID attemptId,
+      ContainerId containerId) {
+    attemptToContainerIdMap.put(attemptId, containerId);
+  }
+
+  @Override
+  public void unregisterRunningContainer(ContainerId containerId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unregistering Container from TaskAttemptListener: "
+          + containerId);
+    }
+    registeredContainers.remove(containerId);
+  }
+  
+  private void pingContainerHeartbeatHandler(ContainerId containerId) {
+    containerHeartbeatHandler.pinged(containerId);
+  }
+  
+  private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
+    ContainerId containerId = attemptToContainerIdMap.get(taskAttemptId);
+    if (containerId != null) {
+      containerHeartbeatHandler.pinged(containerId);
+    } else {
+      LOG.warn("Handling communication from attempt: " + taskAttemptId
+          + ", ContainerId not known for this attempt");
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+
+/**
+ * This class keeps track of tasks that have already been launched. It
+ * determines if a task is alive and running or marks a task as dead if it does
+ * not hear from it for a long time.
+ * 
+ */
+@SuppressWarnings({"unchecked"})
+public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID> {
+
+  public TaskHeartbeatHandler(AppContext context, int numThreads) {
+    super(context, numThreads, "TaskHeartbeatHandler");
+  }
+
+  @Override
+  protected int getConfiguredTimeout(Configuration conf) {
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+  }
+
+  @Override
+  protected int getConfiguredTimeoutCheckInterval(Configuration conf) {
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
+  }
+
+  @Override
+  protected boolean hasTimedOut(
+      HeartbeatHandlerBase.ReportTime report,
+      long currentTime) {
+    return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
+  }
+
+  @Override
+  protected void handleTimeOut(TezTaskAttemptID attemptId) {
+    eventHandler.handle(new TaskAttemptEventDiagnosticsUpdate(attemptId,
+        "AttemptID:" + attemptId.toString()
+        + " Timed out after " + timeOut / 1000 + " secs"));
+    eventHandler.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TIMED_OUT));
+  }
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.client;
+
+import java.net.InetSocketAddress;
+
+// TODONOTES - RPC service for clients
+public interface ClientService {
+
+  InetSocketAddress getBindAddress();
+
+  int getHttpPort();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.client.impl;
+
+import java.net.InetSocketAddress;
+
+import org.apache.tez.dag.app.client.ClientService;
+
+public class TezClientService implements ClientService {
+
+  // TODO remove dummy client service
+  private final InetSocketAddress dummySocketAddress =
+      new InetSocketAddress(0);
+
+  @Override
+  public InetSocketAddress getBindAddress() {
+    // TODO Auto-generated method stub
+    return dummySocketAddress;
+  }
+
+  @Override
+  public int getHttpPort() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.client;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,83 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezVertexID;
+
+/**
+ * Main interface to interact with the job. Provides only getters. 
+ */
+public interface DAG {
+
+  TezDAGID getID();
+  String getName();
+  DAGState getState();
+  DAGReport getReport();
+
+  /**
+   * Get all the counters of this DAG. This includes job-counters aggregated
+   * together with the counters of each task. This creates a clone of the
+   * Counters, so use this judiciously.  
+   * @return job-counters and aggregate task-counters
+   */
+  TezCounters getAllCounters();
+
+  Map<TezVertexID,Vertex> getVertices();
+  Vertex getVertex(TezVertexID vertexId);
+  List<String> getDiagnostics();
+  int getTotalVertices();
+  int getCompletedVertices();
+  float getProgress();
+  boolean isUber();
+  String getUserName();
+  String getQueueName();
+  
+  Configuration getConf();
+  
+  /**
+   * @return a path to where the config file for this job is located.
+   */
+  Path getConfFile();
+  
+  /**
+   * @return the ACLs for this job for each type of JobACL given. 
+   */
+  Map<JobACL, AccessControlList> getJobACLs();
+
+  /**
+   * @return information for MR AppMasters (previously failed and current)
+   */
+  // TODO Recovery
+  //List<AMInfo> getAMInfos();
+  
+  boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
+
+  VertexLocationHint getVertexLocationHint(TezVertexID vertexId);
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.records.AMInfo;
+
+public abstract class DAGReport {
+  public abstract ApplicationId getDAGId();
+  public abstract DAGState getDAGState();
+  public abstract float getMapProgress();
+  public abstract float getReduceProgress();
+  public abstract float getCleanupProgress();
+  public abstract float getSetupProgress();
+  public abstract long getSubmitTime();
+  public abstract long getStartTime();
+  public abstract long getFinishTime();
+  public abstract String getUser();
+  public abstract String getJobName();
+  public abstract String getTrackingUrl();
+  public abstract String getDiagnostics();
+  public abstract String getJobFile();
+  public abstract List<AMInfo> getAMInfos();
+  public abstract boolean isUber();
+
+  public abstract void setDAGId(ApplicationId dagId);
+  public abstract void setJobState(DAGState dagState);
+  public abstract void setMapProgress(float progress);
+  public abstract void setReduceProgress(float progress);
+  public abstract void setCleanupProgress(float progress);
+  public abstract void setSetupProgress(float progress);
+  public abstract void setSubmitTime(long submitTime);
+  public abstract void setStartTime(long startTime);
+  public abstract void setFinishTime(long finishTime);
+  public abstract void setUser(String user);
+  public abstract void setJobName(String jobName);
+  public abstract void setTrackingUrl(String trackingUrl);
+  public abstract void setDiagnostics(String diagnostics);
+  public abstract void setJobFile(String jobFile);
+  public abstract void setAMInfos(List<AMInfo> amInfos);
+  public abstract void setIsUber(boolean isUber);
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGReport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+
+public interface DAGScheduler {
+  
+  public void vertexCompleted(Vertex vertex);
+  
+  public void scheduleTask(DAGEventSchedulerUpdate event);
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.dag.app.dag;
+
+public enum DAGState {
+  NEW,
+  INITED,
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  KILLED,
+  ERROR, 
+  KILL_WAIT
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+public enum JobStateInternal {
+  NEW,
+  INITED,
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  KILL_WAIT,
+  KILLED,
+  ERROR
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/JobStateInternal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,74 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.Map;
+
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.records.TaskReport;
+import org.apache.tez.dag.api.records.TaskState;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+/**
+ * Read only view of Task.
+ */
+public interface Task {
+  TezTaskID getTaskId();
+  TaskReport getReport();
+  TaskState getState();
+  TezCounters getCounters();
+  float getProgress();
+  Map<TezTaskAttemptID, TaskAttempt> getAttempts();
+  TaskAttempt getAttempt(TezTaskAttemptID attemptID);
+
+  /** Has Task reached the final state or not.
+   */
+  boolean isFinished();
+
+  /**
+   * Can the output of the taskAttempt be committed. Note that once the task
+   * gives a go for a commit, further canCommit requests from any other attempts
+   * should return false.
+   * 
+   * @param taskAttemptID
+   * @return whether the attempt's output can be committed or not.
+   */
+  boolean canCommit(TezTaskAttemptID taskAttemptID);
+
+  
+  /**
+   * Do the running tasks need to stick around after they're done processing and
+   * generating output. Required for tasks which have custom output handling
+   * such as in-memory shuffle.
+   * 
+   * @return whether the task needs to stick around.
+   */
+  boolean needsWaitAfterOutputConsumable();
+  
+  /**
+   * Get the attempt id which has reported in as output ready. null if not
+   * applicable.
+   * 
+   * @return
+   */
+  TezTaskAttemptID getOutputConsumableAttempt();
+  
+  public Vertex getVertex();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,111 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.records.TaskAttemptReport;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+/**
+ * Read only view of TaskAttempt.
+ */
+public interface TaskAttempt {
+  TezTaskAttemptID getID();
+  TaskAttemptReport getReport();
+  List<String> getDiagnostics();
+  TezCounters getCounters();
+  float getProgress();
+  TaskAttemptState getState();
+
+  /** 
+   * Has attempt reached the final state or not.
+   * @return true if it has finished, else false
+   */
+  boolean isFinished();
+
+  /**
+   * @return the container ID if a container is assigned, otherwise null.
+   */
+  ContainerId getAssignedContainerID();
+
+  /**
+   * @return container mgr address if a container is assigned, otherwise null.
+   */
+  String getAssignedContainerMgrAddress();
+  
+  /**
+   * @return node's id if a container is assigned, otherwise null.
+   */
+  NodeId getNodeId();
+  
+  /**
+   * @return node's http address if a container is assigned, otherwise null.
+   */
+  String getNodeHttpAddress();
+  
+  /**
+   * @return node's rack name if a container is assigned, otherwise null.
+   */
+  String getNodeRackName();
+
+  /** 
+   * @return time at which container is launched. If container is not launched
+   * yet, returns 0.
+   */
+  long getLaunchTime();
+
+  /** 
+   * @return attempt's finish time. If attempt is not finished
+   *  yet, returns 0.
+   */
+  long getFinishTime();
+  
+  /**
+   * @return The attempt's input ready time. If
+   * attempt's input is not ready yet, returns 0.
+   */
+  long getInputReadyTime();
+
+  /**
+   * @return The attempt's output ready time. If attempt's output is not 
+   * ready yet, returns 0.
+   */
+  long getOutputReadyTime();
+
+  // TODO TEZDAG - remove all references to ShufflePort
+  /**
+   * @return the port shuffle is on.
+   */
+  public int getShufflePort();
+  
+  public Task getTask();
+  
+  public boolean getIsRescheduled();
+
+  public Map<String, LocalResource> getLocalResources();
+
+  public Map<String, String> getEnvironment();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+* TaskAttemptImpl internal state machine states.
+*
+*/
+@Private
+public enum TaskAttemptStateInternal {
+  NEW,
+  START_WAIT,
+  RUNNING,
+  OUTPUT_CONSUMABLE, 
+  COMMIT_PENDING,
+  KILL_IN_PROGRESS, 
+  FAIL_IN_PROGRESS,
+  KILLED,
+  FAILED,
+  SUCCEEDED
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+public enum TaskStateInternal {
+  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskStateInternal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,78 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+
+
+/**
+ * Main interface to interact with the job. Provides only getters. 
+ */
+public interface Vertex extends Comparable<Vertex> {
+
+  TezVertexID getVertexId();
+  int getDistanceFromRoot();
+  String getName();
+  VertexState getState();
+  Configuration getConf();
+
+  /**
+   * Get all the counters of this vertex. 
+   * @return aggregate task-counters
+   */
+  TezCounters getAllCounters();
+
+  Map<TezTaskID, Task> getTasks();
+  Task getTask(TezTaskID taskID);
+  List<String> getDiagnostics();
+  int getTotalTasks();
+  int getCompletedTasks();
+  float getProgress();
+  
+  TezDependentTaskCompletionEvent[]
+      getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+  
+  void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
+  void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
+
+  Map<Vertex, EdgeProperty> getInputVertices();
+  Map<Vertex, EdgeProperty> getOutputVertices();
+  
+  List<InputSpec> getInputSpecList();
+  List<OutputSpec> getOutputSpecList();
+
+  int getInputVerticesCount();
+  int getOutputVerticesCount();
+  void scheduleTasks(Collection<TezTaskID> taskIDs);
+  Resource getTaskResource();
+
+  public DAG getDAG();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public interface VertexScheduler {
+  void onVertexStarted();
+  void onVertexCompleted();
+  void onSourceTaskCompleted(TezTaskAttemptID attemptId);
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app.dag;
+
+public enum VertexState {
+  NEW,
+  INITED,
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  KILLED,
+  ERROR,
+  KILL_WAIT,
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.engine.records.TezDAGID;
+
+/**
+ * This class encapsulates job related events.
+ *
+ */
+public class DAGEvent extends AbstractEvent<DAGEventType> {
+
+  private TezDAGID dagId;
+
+  public DAGEvent(TezDAGID dagId, DAGEventType type) {
+    super(type);
+    this.dagId = dagId;
+  }
+
+  public TezDAGID getDAGId() {
+    return dagId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.engine.records.TezDAGID;
+
+public class DAGEventCounterUpdate extends DAGEvent {
+
+  List<CounterIncrementalUpdate> counterUpdates = null;
+  
+  public DAGEventCounterUpdate(TezDAGID dagId) {
+    super(dagId, DAGEventType.DAG_COUNTER_UPDATE);
+    counterUpdates = new ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>();
+  }
+
+  public void addCounterUpdate(Enum<?> key, long incrValue) {
+    counterUpdates.add(new CounterIncrementalUpdate(key, incrValue));
+  }
+  
+  public List<CounterIncrementalUpdate> getCounterUpdates() {
+    return counterUpdates;
+  }
+  
+  public static class CounterIncrementalUpdate {
+    Enum<?> key;
+    long incrValue;
+    
+    public CounterIncrementalUpdate(Enum<?> key, long incrValue) {
+      this.key = key;
+      this.incrValue = incrValue;
+    }
+    
+    public Enum<?> getCounterKey() {
+      return key;
+    }
+
+    public long getIncrementValue() {
+      return incrValue;
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezDAGID;
+
+public class DAGEventDiagnosticsUpdate extends DAGEvent {
+
+  private String diagnosticUpdate;
+
+  public DAGEventDiagnosticsUpdate(TezDAGID dagId, String diagnostic) {
+    super(dagId, DAGEventType.DAG_DIAGNOSTIC_UPDATE);
+    this.diagnosticUpdate = diagnostic;
+  }
+
+  public String getDiagnosticUpdate() {
+    return this.diagnosticUpdate;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventDiagnosticsUpdate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.TaskAttempt;
+
+public class DAGEventSchedulerUpdate extends DAGEvent {
+  
+  public enum UpdateType {
+    TA_SCHEDULE
+  }
+  
+  private final TaskAttempt attempt;
+  private final UpdateType updateType;
+  
+  public DAGEventSchedulerUpdate(UpdateType updateType, TaskAttempt attempt) {
+    super(attempt.getID().getTaskID().getVertexID().getDAGId(), 
+          DAGEventType.DAG_SCHEDULER_UPDATE);
+    this.attempt = attempt;
+    this.updateType = updateType;
+  }
+  
+  public UpdateType getUpdateType() {
+    return updateType;
+  }
+  
+  public TaskAttempt getAttempt() {
+    return attempt;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,53 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+/**
+ * Event types handled by Job.
+ */
+public enum DAGEventType {
+
+  //Producer:Client
+  DAG_KILL,
+
+  //Producer:MRAppMaster
+  DAG_INIT,
+  DAG_START,
+
+  //Producer:Task
+  /*
+  JOB_TASK_COMPLETED,
+  JOB_MAP_TASK_RESCHEDULED,
+  JOB_TASK_ATTEMPT_COMPLETED,
+  */
+  
+  //Producer: Vertex
+  DAG_VERTEX_INITED,
+  DAG_VERTEX_STARTED,
+  DAG_VERTEX_COMPLETED,
+  DAG_SCHEDULER_UPDATE,
+  
+  //Producer:Job
+  DAG_COMPLETED,
+
+  //Producer:Any component
+  DAG_DIAGNOSTIC_UPDATE,
+  INTERNAL_ERROR,
+  DAG_COUNTER_UPDATE,  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class DAGEventVertexCompleted extends DAGEvent {
+
+  private TezVertexID vertexId;
+  private VertexState vertexState;
+
+  public DAGEventVertexCompleted(TezVertexID vertexId, VertexState vertexState) {
+    super(vertexId.getDAGId(), DAGEventType.DAG_VERTEX_COMPLETED);
+    this.vertexId = vertexId;
+    this.vertexState = vertexState;
+  }
+
+  public TezVertexID getVertexId() {
+    return vertexId;
+  }
+
+  public VertexState getVertexState() {
+    return vertexState;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.engine.records.TezDAGID;
+
+public class DAGFinishEvent extends AbstractEvent<DAGFinishEvent.Type> {
+
+  public enum Type {
+    STATE_CHANGED
+  }
+
+  private TezDAGID dagId;
+
+  public DAGFinishEvent(TezDAGID dagId) {
+    super(Type.STATE_CHANGED);
+    this.dagId = dagId;
+  }
+
+  public TezDAGID getDagId() {
+    return dagId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGFinishEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+public interface DiagnosableEvent {
+
+  public String getDiagnosticInfo();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DiagnosableEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+/**
+ * This class encapsulates task attempt related events.
+ *
+ */
+public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
+
+  private TezTaskAttemptID attemptID;
+  
+  /**
+   * Create a new TaskAttemptEvent.
+   * @param id the id of the task attempt
+   * @param type the type of event that happened.
+   */
+  public TaskAttemptEvent(TezTaskAttemptID id, TaskAttemptEventType type) {
+    super(type);
+    this.attemptID = id;
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return attemptID;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
+    implements DiagnosableEvent {
+
+  private final String message;
+
+  public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message) {
+    super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    this.message = message;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return message;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
+    implements DiagnosableEvent {
+
+  private final String message;
+
+  public TaskAttemptEventContainerTerminating(TezTaskAttemptID id,
+      String diagMessage) {
+    super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+    this.message = diagMessage;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return this.message;
+  }
+
+}
\ No newline at end of file

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TaskAttemptEventDiagnosticsUpdate extends TaskAttemptEvent {
+
+  private String diagnosticInfo;
+
+  public TaskAttemptEventDiagnosticsUpdate(TezTaskAttemptID attemptID,
+      String diagnosticInfo) {
+    super(attemptID, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE);
+    this.diagnosticInfo = diagnosticInfo;
+  }
+
+  public String getDiagnosticInfo() {
+    return diagnosticInfo;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventDiagnosticsUpdate.java
------------------------------------------------------------------------------
    svn:eol-style = native