You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/12 00:59:21 UTC

git commit: TEZ-435. Handle out-of-band fatal errors from inputs/outputs. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 8d89485fd -> c86e0e40d


TEZ-435. Handle out-of-band fatal errors from inputs/outputs. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c86e0e40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c86e0e40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c86e0e40

Branch: refs/heads/TEZ-398
Commit: c86e0e40ddaba6f3565bc54c74581ee55f9393bd
Parents: 8d89485
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Sep 11 15:59:06 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Sep 11 15:59:06 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 85 +++++++++++++++-----
 .../engine/newapi/impl/TezInputContextImpl.java | 16 ++--
 .../newapi/impl/TezOutputContextImpl.java       | 13 ++-
 .../newapi/impl/TezProcessorContextImpl.java    | 13 ++-
 .../engine/newapi/impl/TezTaskContextImpl.java  | 46 +++++++----
 .../tez/engine/newapi/impl/TezUmbilical.java    |  6 ++
 .../LogicalIOProcessorRuntimeTask.java          | 23 +++---
 .../tez/engine/newruntime/RuntimeTask.java      | 45 +++++++++++
 8 files changed, 187 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 1106d7c..5f72982 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -183,7 +183,20 @@ public class YarnTezDagChild {
                   currentTask.handleEvent(e);
                 } catch (Throwable t) {
                   LOG.warn("Failed to handle event", t);
-                  // TODONEWTEZ
+                  currentTask.setFatalError(t, "Failed to handle event");
+                  TezEvent taskAttemptFailedEvent = new TezEvent(
+                      new TaskAttemptFailedEvent(
+                          StringUtils.stringifyException(t)),
+                      new EventMetaData(EventProducerConsumerType.SYSTEM,
+                          "", "", currentTaskAttemptID));
+                  try {
+                    umbilical.taskAttemptFailed(currentTaskAttemptID,
+                        taskAttemptFailedEvent);
+                  } catch (IOException ioe) {
+                    // TODO Auto-generated catch block
+                    ioe.printStackTrace();
+                    // TODO NEWTEZ System exit?
+                  }
                 }
               }
             } finally {
@@ -304,6 +317,22 @@ public class YarnTezDagChild {
       public void addEvents(Collection<TezEvent> events) {
         eventsToSend.addAll(events);
       }
+
+      @Override
+      public void signalFatalError(TezTaskAttemptID taskAttemptID,
+          String diagnostics,
+          EventMetaData sourceInfo) {
+        TezEvent taskAttemptFailedEvent =
+            new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+                sourceInfo);
+        try {
+          umbilical.taskAttemptFailed(taskAttemptID, taskAttemptFailedEvent);
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+          // TODONEWTEZ System.exit ?
+        }
+      }
     };
 
     // report non-pid to application master
@@ -320,6 +349,7 @@ public class YarnTezDagChild {
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
     int taskCount = 0;
     TezVertexID currentVertexId = null;
+    EventMetaData currentSourceInfo = null;
     try {
       while (true) {
         // poll for new task
@@ -371,6 +401,10 @@ public class YarnTezDagChild {
           taskLock.writeLock().unlock();
         }
 
+        final EventMetaData sourceInfo = new EventMetaData(
+            EventProducerConsumerType.SYSTEM,
+            taskSpec.getVertexName(), "", currentTaskAttemptID);
+        currentSourceInfo = sourceInfo;
 
         // TODO Initiate Java VM metrics
         // JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
@@ -382,25 +416,29 @@ public class YarnTezDagChild {
         childUGI.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
-            EventMetaData sourceInfo = new EventMetaData(
-                EventProducerConsumerType.SYSTEM,
-                taskSpec.getVertexName(), "", currentTaskAttemptID);
             try {
               currentTask.initialize();
-              currentTask.run();
-              currentTask.close();
+              if (!currentTask.hadFatalError()) {
+                currentTask.run();
+                currentTask.close();
+              }
               // TODONEWTEZ check if task had a fatal error before
               // sending completed event
-              TezEvent taskCompletedEvent =
-                  new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
-              umbilical.taskAttemptCompleted(currentTaskAttemptID,
-                  taskCompletedEvent);
+              if (!currentTask.hadFatalError()) {
+                TezEvent taskCompletedEvent =
+                    new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
+                umbilical.taskAttemptCompleted(currentTaskAttemptID,
+                    taskCompletedEvent);
+              }
             } catch (Throwable t) {
-              TezEvent taskAttemptFailedEvent =
-                  new TezEvent(new TaskAttemptFailedEvent(t.getMessage()),
-                      sourceInfo);
-              umbilical.taskAttemptCompleted(currentTaskAttemptID,
-                  taskAttemptFailedEvent);
+              if (!currentTask.hadFatalError()) {
+                TezEvent taskAttemptFailedEvent =
+                    new TezEvent(new TaskAttemptFailedEvent(
+                        StringUtils.stringifyException(t)),
+                        sourceInfo);
+                umbilical.taskAttemptCompleted(currentTaskAttemptID,
+                    taskAttemptFailedEvent);
+              }
             }
             try {
               taskLock.writeLock().lock();
@@ -416,13 +454,20 @@ public class YarnTezDagChild {
       }
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
-      umbilical.fsError(currentTaskAttemptID, e.getMessage());
+      TezEvent taskAttemptFailedEvent =
+          new TezEvent(new TaskAttemptFailedEvent(
+              StringUtils.stringifyException(e)),
+              currentSourceInfo);
+      umbilical.taskAttemptFailed(currentTaskAttemptID, taskAttemptFailedEvent);
     } catch (Throwable throwable) {
-      LOG.fatal("Error running child : "
-    	        + StringUtils.stringifyException(throwable));
+      String cause = StringUtils.stringifyException(throwable);
+      LOG.fatal("Error running child : " + cause);
       if (currentTaskAttemptID != null) {
-        String cause = StringUtils.stringifyException(throwable);
-        umbilical.fatalError(currentTaskAttemptID, cause);
+        TezEvent taskAttemptFailedEvent =
+            new TezEvent(new TaskAttemptFailedEvent(cause),
+                currentSourceInfo);
+        umbilical.taskAttemptFailed(currentTaskAttemptID,
+            taskAttemptFailedEvent);
       }
     } finally {
       stopped.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 604cbff..fff2090 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -28,22 +28,23 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezInputContext;
 import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newruntime.RuntimeTask;
 
 public class TezInputContextImpl extends TezTaskContextImpl
     implements TezInputContext {
 
   private final byte[] userPayload;
   private final String sourceVertexName;
-  private final TezUmbilical tezUmbilical;
   private final EventMetaData sourceInfo;
 
   @Private
   public TezInputContextImpl(Configuration conf,
       TezUmbilical tezUmbilical, String taskVertexName,
       String sourceVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters, byte[] userPayload) {
-    super(conf, taskVertexName, taskAttemptID, counters);
-    this.tezUmbilical = tezUmbilical;
+      TezCounters counters, byte[] userPayload,
+      RuntimeTask runtimeTask) {
+    super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
+        tezUmbilical);
     this.userPayload = userPayload;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(
@@ -51,7 +52,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
         taskAttemptID);
     this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
         .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
-        getTaskIndex(), getAttemptNumber(), sourceVertexName); 
+        getTaskIndex(), getAttemptNumber(), sourceVertexName);
   }
 
   @Override
@@ -73,4 +74,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
   public String getSourceVertexName() {
     return sourceVertexName;
   }
+
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index c4a069e..a0695cc 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -28,13 +28,13 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newruntime.RuntimeTask;
 
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements TezOutputContext {
 
   private final byte[] userPayload;
   private final String destinationVertexName;
-  private final TezUmbilical tezUmbilical;
   private final EventMetaData sourceInfo;
 
   @Private
@@ -42,11 +42,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       TezUmbilical tezUmbilical, String taskVertexName,
       String destinationVertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload) {
-    super(conf, taskVertexName, taskAttemptID, counters);
+      byte[] userPayload, RuntimeTask runtimeTask) {
+    super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
+        tezUmbilical);
     this.userPayload = userPayload;
     this.destinationVertexName = destinationVertexName;
-    this.tezUmbilical = tezUmbilical;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
         taskVertexName, destinationVertexName, taskAttemptID);
     this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
@@ -74,4 +74,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     return destinationVertexName;
   }
 
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 6f61102..4ec55d0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -27,21 +27,21 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezProcessorContext;
 import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newruntime.RuntimeTask;
 
 public class TezProcessorContextImpl extends TezTaskContextImpl
   implements TezProcessorContext {
 
   private final byte[] userPayload;
-  private final TezUmbilical tezUmbilical;
   private final EventMetaData sourceInfo;
 
   public TezProcessorContextImpl(Configuration tezConf,
       TezUmbilical tezUmbilical, String vertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload) {
-    super(tezConf, vertexName, taskAttemptID, counters);
+      byte[] userPayload, RuntimeTask runtimeTask) {
+    super(tezConf, vertexName, taskAttemptID, counters, runtimeTask,
+        tezUmbilical);
     this.userPayload = userPayload;
-    this.tezUmbilical = tezUmbilical;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
     this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
@@ -70,4 +70,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
 
   }
 
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index b77bcdd..2925d05 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -23,13 +23,14 @@ import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
+import org.apache.tez.engine.newruntime.RuntimeTask;
 
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
@@ -39,18 +40,23 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   private final TezCounters counters;
   private String[] workDirs;
   protected String uniqueIdentifier;
+  protected final RuntimeTask runtimeTask;
+  protected final TezUmbilical tezUmbilical;
 
   @Private
   public TezTaskContextImpl(Configuration conf,
       String taskVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters) {
+      TezCounters counters, RuntimeTask runtimeTask,
+      TezUmbilical tezUmbilical) {
     this.conf = conf;
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
     this.counters = counters;
     // TODO Maybe change this to be task id specific at some point. For now
     // Shuffle code relies on this being a path specified by YARN
-    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS); 
+    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
+    this.runtimeTask = runtimeTask;
+    this.tezUmbilical = tezUmbilical;
   }
 
   @Override
@@ -58,7 +64,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     return taskAttemptID.getTaskID().getVertexID().getDAGId()
         .getApplicationId();
   }
-  
+
   @Override
   public int getTaskIndex() {
     return taskAttemptID.getTaskID().getId();
@@ -75,7 +81,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     // the unique identifier.
     return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
   }
-  
+
   @Override
   public String getTaskVertexName() {
     return taskVertexName;
@@ -91,26 +97,38 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   public String[] getWorkDirs() {
     return Arrays.copyOf(workDirs, workDirs.length);
   }
-  
+
   @Override
   public String getUniqueIdentifier() {
     return uniqueIdentifier;
   }
-  
-  @Override
-  public void fatalError(Throwable exception, String message) {
-    // TODO NEWTEZ Implement once the TezContext communication is setup.
-  }
-  
+
   @Override
   public ByteBuffer getServiceConsumerMetaData(String serviceName) {
     // TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name.
     return null;
   }
-  
+
   @Override
   public ByteBuffer getServiceProviderMetaData(String serviceName) {
     return AuxiliaryServiceHelper.getServiceDataFromEnv(
-        ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, System.getenv());
+        serviceName, System.getenv());
+  }
+
+  protected void signalFatalError(Throwable t, String message,
+      EventMetaData sourceInfo) {
+    runtimeTask.setFatalError(t, message);
+    String diagnostics;
+    if (t != null && message != null) {
+      diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
+          + ", errorMessage=" + message;
+    } else if (t == null && message == null) {
+      diagnostics = "Unknown error";
+    } else {
+      diagnostics = t != null ?
+          "exceptionThrown=" + StringUtils.stringifyException(t)
+          : " errorMessage=" + message;
+    }
+    tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
index c3065fe..b88dc63 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
@@ -20,8 +20,14 @@ package org.apache.tez.engine.newapi.impl;
 
 import java.util.Collection;
 
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
 public interface TezUmbilical {
 
   public void addEvents(Collection<TezEvent> events);
 
+  public void signalFatalError(TezTaskAttemptID taskAttemptID,
+      String diagnostics,
+      EventMetaData sourceInfo);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index f0ac36e..05a28d8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -54,11 +54,7 @@ import org.apache.tez.engine.newapi.impl.TezUmbilical;
 import com.google.common.base.Preconditions;
 
 @Private
-public class LogicalIOProcessorRuntimeTask {
-
-  private enum State {
-    NEW, INITED, RUNNING, CLOSED
-  }
+public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private static final Log LOG = LogFactory
       .getLog(LogicalIOProcessorRuntimeTask.class);
@@ -78,8 +74,6 @@ public class LogicalIOProcessorRuntimeTask {
 
   private final TezCounters tezCounters;
 
-  private State state;
-
   private Map<String, LogicalInput> inputMap;
   private Map<String, LogicalOutput> outputMap;
 
@@ -152,9 +146,11 @@ public class LogicalIOProcessorRuntimeTask {
   }
 
   public void run() throws IOException {
-    Preconditions.checkState(this.state == State.INITED,
-        "Can only run while in INITED state. Current: " + this.state);
-    this.state = State.RUNNING;
+    synchronized (this.state) {
+      Preconditions.checkState(this.state == State.INITED,
+          "Can only run while in INITED state. Current: " + this.state);
+      this.state = State.RUNNING;
+    }
     LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
     lioProcessor.run(inputMap, outputMap);
   }
@@ -223,7 +219,7 @@ public class LogicalIOProcessorRuntimeTask {
     TezInputContext inputContext = new TezInputContextImpl(tezConf,
         tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
         taskSpec.getTaskAttemptID(), tezCounters,
-        inputSpec.getInputDescriptor().getUserPayload());
+        inputSpec.getInputDescriptor().getUserPayload(), this);
     return inputContext;
   }
 
@@ -232,14 +228,14 @@ public class LogicalIOProcessorRuntimeTask {
         tezUmbilical, taskSpec.getVertexName(),
         outputSpec.getDestinationVertexName(),
         taskSpec.getTaskAttemptID(), tezCounters,
-        outputSpec.getOutputDescriptor().getUserPayload());
+        outputSpec.getOutputDescriptor().getUserPayload(), this);
     return outputContext;
   }
 
   private TezProcessorContext createProcessorContext() {
     TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
         tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
-        tezCounters, processorDescriptor.getUserPayload());
+        tezCounters, processorDescriptor.getUserPayload(), this);
     return processorContext;
   }
 
@@ -320,4 +316,5 @@ public class LogicalIOProcessorRuntimeTask {
       break;
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
new file mode 100644
index 0000000..479f917
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newruntime;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class RuntimeTask {
+
+  protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
+  protected Throwable fatalError = null;
+  protected String fatalErrorMessage = null;
+
+  protected enum State {
+    NEW, INITED, RUNNING, CLOSED;
+  }
+
+  protected State state;
+
+  public void setFatalError(Throwable t, String message) {
+    hasFatalError.set(true);
+    this.fatalError = t;
+    this.fatalErrorMessage = message;
+  }
+
+  public boolean hadFatalError() {
+    return hasFatalError.get();
+  }
+
+}