You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/12 00:59:21 UTC
git commit: TEZ-435. Handle out-of-band fatal errors from
inputs/outputs. (hitesh)
Updated Branches:
refs/heads/TEZ-398 8d89485fd -> c86e0e40d
TEZ-435. Handle out-of-band fatal errors from inputs/outputs. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c86e0e40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c86e0e40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c86e0e40
Branch: refs/heads/TEZ-398
Commit: c86e0e40ddaba6f3565bc54c74581ee55f9393bd
Parents: 8d89485
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Sep 11 15:59:06 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Sep 11 15:59:06 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 85 +++++++++++++++-----
.../engine/newapi/impl/TezInputContextImpl.java | 16 ++--
.../newapi/impl/TezOutputContextImpl.java | 13 ++-
.../newapi/impl/TezProcessorContextImpl.java | 13 ++-
.../engine/newapi/impl/TezTaskContextImpl.java | 46 +++++++----
.../tez/engine/newapi/impl/TezUmbilical.java | 6 ++
.../LogicalIOProcessorRuntimeTask.java | 23 +++---
.../tez/engine/newruntime/RuntimeTask.java | 45 +++++++++++
8 files changed, 187 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 1106d7c..5f72982 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -183,7 +183,20 @@ public class YarnTezDagChild {
currentTask.handleEvent(e);
} catch (Throwable t) {
LOG.warn("Failed to handle event", t);
- // TODONEWTEZ
+ currentTask.setFatalError(t, "Failed to handle event");
+ TezEvent taskAttemptFailedEvent = new TezEvent(
+ new TaskAttemptFailedEvent(
+ StringUtils.stringifyException(t)),
+ new EventMetaData(EventProducerConsumerType.SYSTEM,
+ "", "", currentTaskAttemptID));
+ try {
+ umbilical.taskAttemptFailed(currentTaskAttemptID,
+ taskAttemptFailedEvent);
+ } catch (IOException ioe) {
+ // TODO Auto-generated catch block
+ ioe.printStackTrace();
+ // TODO NEWTEZ System exit?
+ }
}
}
} finally {
@@ -304,6 +317,22 @@ public class YarnTezDagChild {
public void addEvents(Collection<TezEvent> events) {
eventsToSend.addAll(events);
}
+
+ @Override
+ public void signalFatalError(TezTaskAttemptID taskAttemptID,
+ String diagnostics,
+ EventMetaData sourceInfo) {
+ TezEvent taskAttemptFailedEvent =
+ new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+ sourceInfo);
+ try {
+ umbilical.taskAttemptFailed(taskAttemptID, taskAttemptFailedEvent);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ // TODONEWTEZ System.exit ?
+ }
+ }
};
// report non-pid to application master
@@ -320,6 +349,7 @@ public class YarnTezDagChild {
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
int taskCount = 0;
TezVertexID currentVertexId = null;
+ EventMetaData currentSourceInfo = null;
try {
while (true) {
// poll for new task
@@ -371,6 +401,10 @@ public class YarnTezDagChild {
taskLock.writeLock().unlock();
}
+ final EventMetaData sourceInfo = new EventMetaData(
+ EventProducerConsumerType.SYSTEM,
+ taskSpec.getVertexName(), "", currentTaskAttemptID);
+ currentSourceInfo = sourceInfo;
// TODO Initiate Java VM metrics
// JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
@@ -382,25 +416,29 @@ public class YarnTezDagChild {
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- EventMetaData sourceInfo = new EventMetaData(
- EventProducerConsumerType.SYSTEM,
- taskSpec.getVertexName(), "", currentTaskAttemptID);
try {
currentTask.initialize();
- currentTask.run();
- currentTask.close();
+ if (!currentTask.hadFatalError()) {
+ currentTask.run();
+ currentTask.close();
+ }
// TODONEWTEZ check if task had a fatal error before
// sending completed event
- TezEvent taskCompletedEvent =
- new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
- umbilical.taskAttemptCompleted(currentTaskAttemptID,
- taskCompletedEvent);
+ if (!currentTask.hadFatalError()) {
+ TezEvent taskCompletedEvent =
+ new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
+ umbilical.taskAttemptCompleted(currentTaskAttemptID,
+ taskCompletedEvent);
+ }
} catch (Throwable t) {
- TezEvent taskAttemptFailedEvent =
- new TezEvent(new TaskAttemptFailedEvent(t.getMessage()),
- sourceInfo);
- umbilical.taskAttemptCompleted(currentTaskAttemptID,
- taskAttemptFailedEvent);
+ if (!currentTask.hadFatalError()) {
+ TezEvent taskAttemptFailedEvent =
+ new TezEvent(new TaskAttemptFailedEvent(
+ StringUtils.stringifyException(t)),
+ sourceInfo);
+ umbilical.taskAttemptCompleted(currentTaskAttemptID,
+ taskAttemptFailedEvent);
+ }
}
try {
taskLock.writeLock().lock();
@@ -416,13 +454,20 @@ public class YarnTezDagChild {
}
} catch (FSError e) {
LOG.fatal("FSError from child", e);
- umbilical.fsError(currentTaskAttemptID, e.getMessage());
+ TezEvent taskAttemptFailedEvent =
+ new TezEvent(new TaskAttemptFailedEvent(
+ StringUtils.stringifyException(e)),
+ currentSourceInfo);
+ umbilical.taskAttemptFailed(currentTaskAttemptID, taskAttemptFailedEvent);
} catch (Throwable throwable) {
- LOG.fatal("Error running child : "
- + StringUtils.stringifyException(throwable));
+ String cause = StringUtils.stringifyException(throwable);
+ LOG.fatal("Error running child : " + cause);
if (currentTaskAttemptID != null) {
- String cause = StringUtils.stringifyException(throwable);
- umbilical.fatalError(currentTaskAttemptID, cause);
+ TezEvent taskAttemptFailedEvent =
+ new TezEvent(new TaskAttemptFailedEvent(cause),
+ currentSourceInfo);
+ umbilical.taskAttemptFailed(currentTaskAttemptID,
+ taskAttemptFailedEvent);
}
} finally {
stopped.set(true);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 604cbff..fff2090 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -28,22 +28,23 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezInputContext;
import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newruntime.RuntimeTask;
public class TezInputContextImpl extends TezTaskContextImpl
implements TezInputContext {
private final byte[] userPayload;
private final String sourceVertexName;
- private final TezUmbilical tezUmbilical;
private final EventMetaData sourceInfo;
@Private
public TezInputContextImpl(Configuration conf,
TezUmbilical tezUmbilical, String taskVertexName,
String sourceVertexName, TezTaskAttemptID taskAttemptID,
- TezCounters counters, byte[] userPayload) {
- super(conf, taskVertexName, taskAttemptID, counters);
- this.tezUmbilical = tezUmbilical;
+ TezCounters counters, byte[] userPayload,
+ RuntimeTask runtimeTask) {
+ super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
+ tezUmbilical);
this.userPayload = userPayload;
this.sourceVertexName = sourceVertexName;
this.sourceInfo = new EventMetaData(
@@ -51,7 +52,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
taskAttemptID);
this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
.getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
- getTaskIndex(), getAttemptNumber(), sourceVertexName);
+ getTaskIndex(), getAttemptNumber(), sourceVertexName);
}
@Override
@@ -73,4 +74,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
public String getSourceVertexName() {
return sourceVertexName;
}
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index c4a069e..a0695cc 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -28,13 +28,13 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newruntime.RuntimeTask;
public class TezOutputContextImpl extends TezTaskContextImpl
implements TezOutputContext {
private final byte[] userPayload;
private final String destinationVertexName;
- private final TezUmbilical tezUmbilical;
private final EventMetaData sourceInfo;
@Private
@@ -42,11 +42,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
TezUmbilical tezUmbilical, String taskVertexName,
String destinationVertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload) {
- super(conf, taskVertexName, taskAttemptID, counters);
+ byte[] userPayload, RuntimeTask runtimeTask) {
+ super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
+ tezUmbilical);
this.userPayload = userPayload;
this.destinationVertexName = destinationVertexName;
- this.tezUmbilical = tezUmbilical;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
taskVertexName, destinationVertexName, taskAttemptID);
this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
@@ -74,4 +74,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
return destinationVertexName;
}
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 6f61102..4ec55d0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -27,21 +27,21 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezProcessorContext;
import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newruntime.RuntimeTask;
public class TezProcessorContextImpl extends TezTaskContextImpl
implements TezProcessorContext {
private final byte[] userPayload;
- private final TezUmbilical tezUmbilical;
private final EventMetaData sourceInfo;
public TezProcessorContextImpl(Configuration tezConf,
TezUmbilical tezUmbilical, String vertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload) {
- super(tezConf, vertexName, taskAttemptID, counters);
+ byte[] userPayload, RuntimeTask runtimeTask) {
+ super(tezConf, vertexName, taskAttemptID, counters, runtimeTask,
+ tezUmbilical);
this.userPayload = userPayload;
- this.tezUmbilical = tezUmbilical;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
taskVertexName, "", taskAttemptID);
this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
@@ -70,4 +70,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
}
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ super.signalFatalError(exception, message, sourceInfo);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index b77bcdd..2925d05 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -23,13 +23,14 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
+import org.apache.tez.engine.newruntime.RuntimeTask;
public abstract class TezTaskContextImpl implements TezTaskContext {
@@ -39,18 +40,23 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
private final TezCounters counters;
private String[] workDirs;
protected String uniqueIdentifier;
+ protected final RuntimeTask runtimeTask;
+ protected final TezUmbilical tezUmbilical;
@Private
public TezTaskContextImpl(Configuration conf,
String taskVertexName, TezTaskAttemptID taskAttemptID,
- TezCounters counters) {
+ TezCounters counters, RuntimeTask runtimeTask,
+ TezUmbilical tezUmbilical) {
this.conf = conf;
this.taskVertexName = taskVertexName;
this.taskAttemptID = taskAttemptID;
this.counters = counters;
// TODO Maybe change this to be task id specific at some point. For now
// Shuffle code relies on this being a path specified by YARN
- this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
+ this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
+ this.runtimeTask = runtimeTask;
+ this.tezUmbilical = tezUmbilical;
}
@Override
@@ -58,7 +64,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
return taskAttemptID.getTaskID().getVertexID().getDAGId()
.getApplicationId();
}
-
+
@Override
public int getTaskIndex() {
return taskAttemptID.getTaskID().getId();
@@ -75,7 +81,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
// the unique identifier.
return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
}
-
+
@Override
public String getTaskVertexName() {
return taskVertexName;
@@ -91,26 +97,38 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
public String[] getWorkDirs() {
return Arrays.copyOf(workDirs, workDirs.length);
}
-
+
@Override
public String getUniqueIdentifier() {
return uniqueIdentifier;
}
-
- @Override
- public void fatalError(Throwable exception, String message) {
- // TODO NEWTEZ Implement once the TezContext communication is setup.
- }
-
+
@Override
public ByteBuffer getServiceConsumerMetaData(String serviceName) {
// TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name.
return null;
}
-
+
@Override
public ByteBuffer getServiceProviderMetaData(String serviceName) {
return AuxiliaryServiceHelper.getServiceDataFromEnv(
- ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, System.getenv());
+ serviceName, System.getenv());
+ }
+
+ protected void signalFatalError(Throwable t, String message,
+ EventMetaData sourceInfo) {
+ runtimeTask.setFatalError(t, message);
+ String diagnostics;
+ if (t != null && message != null) {
+ diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
+ + ", errorMessage=" + message;
+ } else if (t == null && message == null) {
+ diagnostics = "Unknown error";
+ } else {
+ diagnostics = t != null ?
+ "exceptionThrown=" + StringUtils.stringifyException(t)
+ : " errorMessage=" + message;
+ }
+ tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
index c3065fe..b88dc63 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
@@ -20,8 +20,14 @@ package org.apache.tez.engine.newapi.impl;
import java.util.Collection;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
public interface TezUmbilical {
public void addEvents(Collection<TezEvent> events);
+ public void signalFatalError(TezTaskAttemptID taskAttemptID,
+ String diagnostics,
+ EventMetaData sourceInfo);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index f0ac36e..05a28d8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -54,11 +54,7 @@ import org.apache.tez.engine.newapi.impl.TezUmbilical;
import com.google.common.base.Preconditions;
@Private
-public class LogicalIOProcessorRuntimeTask {
-
- private enum State {
- NEW, INITED, RUNNING, CLOSED
- }
+public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private static final Log LOG = LogFactory
.getLog(LogicalIOProcessorRuntimeTask.class);
@@ -78,8 +74,6 @@ public class LogicalIOProcessorRuntimeTask {
private final TezCounters tezCounters;
- private State state;
-
private Map<String, LogicalInput> inputMap;
private Map<String, LogicalOutput> outputMap;
@@ -152,9 +146,11 @@ public class LogicalIOProcessorRuntimeTask {
}
public void run() throws IOException {
- Preconditions.checkState(this.state == State.INITED,
- "Can only run while in INITED state. Current: " + this.state);
- this.state = State.RUNNING;
+ synchronized (this.state) {
+ Preconditions.checkState(this.state == State.INITED,
+ "Can only run while in INITED state. Current: " + this.state);
+ this.state = State.RUNNING;
+ }
LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
lioProcessor.run(inputMap, outputMap);
}
@@ -223,7 +219,7 @@ public class LogicalIOProcessorRuntimeTask {
TezInputContext inputContext = new TezInputContextImpl(tezConf,
tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
taskSpec.getTaskAttemptID(), tezCounters,
- inputSpec.getInputDescriptor().getUserPayload());
+ inputSpec.getInputDescriptor().getUserPayload(), this);
return inputContext;
}
@@ -232,14 +228,14 @@ public class LogicalIOProcessorRuntimeTask {
tezUmbilical, taskSpec.getVertexName(),
outputSpec.getDestinationVertexName(),
taskSpec.getTaskAttemptID(), tezCounters,
- outputSpec.getOutputDescriptor().getUserPayload());
+ outputSpec.getOutputDescriptor().getUserPayload(), this);
return outputContext;
}
private TezProcessorContext createProcessorContext() {
TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
- tezCounters, processorDescriptor.getUserPayload());
+ tezCounters, processorDescriptor.getUserPayload(), this);
return processorContext;
}
@@ -320,4 +316,5 @@ public class LogicalIOProcessorRuntimeTask {
break;
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86e0e40/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
new file mode 100644
index 0000000..479f917
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newruntime;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class RuntimeTask {
+
+ protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
+ protected Throwable fatalError = null;
+ protected String fatalErrorMessage = null;
+
+ protected enum State {
+ NEW, INITED, RUNNING, CLOSED;
+ }
+
+ protected State state;
+
+ public void setFatalError(Throwable t, String message) {
+ hasFatalError.set(true);
+ this.fatalError = t;
+ this.fatalErrorMessage = message;
+ }
+
+ public boolean hadFatalError() {
+ return hasFatalError.get();
+ }
+
+}