You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/06/02 01:37:23 UTC

[45/50] [abbrv] tez git commit: TEZ-2434. Allow tasks to be killed in the runtime. (sseth)

TEZ-2434. Allow tasks to be killed in the runtime. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1ed50e1e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1ed50e1e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1ed50e1e

Branch: refs/heads/TEZ-2003
Commit: 1ed50e1e5655c7b849667bd594ab3751861cedab
Parents: c4b800d
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon May 11 23:34:43 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jun 1 14:24:30 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../org/apache/tez/runtime/task/EndReason.java  |  29 ++
 .../tez/runtime/task/TaskRunner2Callable.java   | 132 ++++++
 .../tez/runtime/task/TaskRunner2Result.java     |  48 ++
 .../org/apache/tez/runtime/task/TezChild.java   |  20 +-
 .../apache/tez/runtime/task/TezTaskRunner.java  |   1 +
 .../apache/tez/runtime/task/TezTaskRunner2.java | 434 +++++++++++++++++++
 7 files changed, 655 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1ed50e1e/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 66c110f..5d2e40a 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -24,5 +24,6 @@ ALL CHANGES:
   TEZ-2420. TaskRunner returning before executing the task.
   TEZ-2433. Fixes after rebase 05/08
   TEZ-2438. tez-tools version in the branch is incorrect.
+  TEZ-2434. Allow tasks to be killed in the Runtime.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/1ed50e1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
new file mode 100644
index 0000000..8dc7a87
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+public enum EndReason {
+  SUCCESS(false),
+  CONTAINER_STOP_REQUESTED(false),
+  KILL_REQUESTED(true),
+  COMMUNICATION_FAILURE(false),
+  TASK_ERROR(false);
+
+  private final boolean isActionable;
+
+  EndReason(boolean isActionable) {
+    this.isActionable = isActionable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1ed50e1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
new file mode 100644
index 0000000..7315bbd
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for running a {@link LogicalIOProcessorRuntimeTask}.
+ * It does not worry about reporting errors, heartbeats etc.
+ *
+ * Returns success / interrupt / failure status via it's return parameter.
+ *
+ * It's the responsibility of the invoker to handle whatever exceptions may be generated by this.
+ */
+public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.TaskRunner2CallableResult> {
+
+
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskRunner2Callable.class);
+
+  private final LogicalIOProcessorRuntimeTask task;
+  private final UserGroupInformation ugi;
+  private final AtomicBoolean stopRequested = new AtomicBoolean(false);
+
+  private volatile Thread ownThread;
+
+  public TaskRunner2Callable(LogicalIOProcessorRuntimeTask task,
+                             UserGroupInformation ugi) {
+    this.task = task;
+    this.ugi = ugi;
+  }
+
+  @Override
+  public TaskRunner2CallableResult callInternal() throws Exception {
+    ownThread = Thread.currentThread();
+    if (stopRequested.get()) {
+      return new TaskRunner2CallableResult(null);
+    }
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<TaskRunner2CallableResult>() {
+        @Override
+        public TaskRunner2CallableResult run() throws Exception {
+          if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
+            return new TaskRunner2CallableResult(null);
+          }
+          LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+          task.initialize();
+
+          if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
+            LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+            task.run();
+          } else {
+            LOG.info("Stopped before running the processor.");
+            return new TaskRunner2CallableResult(null);
+          }
+
+          if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
+            LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+            task.close();
+            task.setFrameworkCounters();
+          } else {
+            LOG.info("Stopped before closing the processor");
+            return new TaskRunner2CallableResult(null);
+          }
+          LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get());
+
+
+          return new TaskRunner2CallableResult(null);
+        }
+      });
+    } catch (Throwable t) {
+      if (t instanceof UndeclaredThrowableException) {
+        t = t.getCause();
+      }
+      return new TaskRunner2CallableResult(t);
+    } finally {
+      // If a stop was requested. Make sure the interrupt status is set during the cleanup.
+
+      // One drawback of not communicating out from here is that task complete messages will only
+      // be sent out after cleanup is complete.
+      // For a successful task, however, this should be almost no delay since close has already happened.
+      maybeFixInterruptStatus();
+      LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get());
+      task.cleanup();
+    }
+  }
+
+  private void maybeFixInterruptStatus() {
+    if (stopRequested.get() && !Thread.currentThread().isInterrupted()) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+
+  public void interruptTask() {
+    // Ensure the task is only interrupted once.
+    if (!stopRequested.getAndSet(true)) {
+      if (ownThread != null) {
+        ownThread.interrupt();
+      }
+    }
+  }
+
+  public static class TaskRunner2CallableResult {
+    final Throwable error;
+
+    public TaskRunner2CallableResult(Throwable error) {
+      this.error = error;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1ed50e1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
new file mode 100644
index 0000000..07b32ce
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+public class TaskRunner2Result {
+  final EndReason endReason;
+  final Throwable error;
+  final boolean containerShutdownRequested;
+
+  public TaskRunner2Result(EndReason endReason, Throwable error, boolean containerShutdownRequested) {
+    this.endReason = endReason;
+    this.error = error;
+    this.containerShutdownRequested = containerShutdownRequested;
+  }
+
+  public EndReason getEndReason() {
+    return endReason;
+  }
+
+  public Throwable getError() {
+    return error;
+  }
+
+  public boolean isContainerShutdownRequested() {
+    return containerShutdownRequested;
+  }
+
+  @Override
+  public String toString() {
+    return "TaskRunner2Result{" +
+        "endReason=" + endReason +
+        ", error=" + error +
+        ", containerShutdownRequested=" + containerShutdownRequested +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1ed50e1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 7fbc0f7..f3f86a9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -247,27 +247,27 @@ public class TezChild {
         cleanupOnTaskChanged(containerTask);
 
         // Execute the Actual Task
-        TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
+        TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
             localDirs, containerTask.getTaskSpec(), appAttemptNumber,
             serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
             executor, objectRegistry, pid, executionContext, memAvailable);
         boolean shouldDie;
         try {
-          shouldDie = !taskRunner.run();
+          TaskRunner2Result result = taskRunner.run();
+          shouldDie = result.isContainerShutdownRequested();
           if (shouldDie) {
             LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
             shutdown();
             return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
                 "Asked to die by the AM");
           }
-        } catch (IOException e) {
-          handleError(e);
-          return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
-              e, "TaskExecutionFailure: " + e.getMessage());
-        } catch (TezException e) {
-          handleError(e);
-          return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
-              e, "TaskExecutionFailure: " + e.getMessage());
+          if (result.getError() != null) {
+            Throwable e = result.getError();
+            handleError(result.getError());
+            return new ContainerExecutionResult(
+                ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+                e, "TaskExecutionFailure: " + e.getMessage());
+          }
         } finally {
           FileSystem.closeAllForUGI(childUGI);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/1ed50e1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index dd4620a..a82d87b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -106,6 +106,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     if (!Thread.currentThread().isInterrupted()) {
       taskFuture = executor.submit(callable);
     } else {
+      taskReporter.unregisterTask(task.getTaskAttemptID());
       return isShutdownRequested();
     }
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/1ed50e1e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
new file mode 100644
index 0000000..73e5c76
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
+import org.apache.tez.runtime.task.TaskRunner2Callable.TaskRunner2CallableResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TezTaskRunner2 {
+
+
+  private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class);
+
+  private final LogicalIOProcessorRuntimeTask task;
+  private final UserGroupInformation ugi;
+
+  private final TaskReporterInterface taskReporter;
+  private final ListeningExecutorService executor;
+  private final UmbilicalAndErrorHandler umbilicalAndErrorHandler;
+
+  // TODO It may be easier to model this as a state machine.
+
+  // Indicates whether a kill has been requested.
+  private final AtomicBoolean killTaskRequested = new AtomicBoolean(false);
+
+  // Indicates whether a stop container has been requested.
+  private final AtomicBoolean stopContainerRequested = new AtomicBoolean(false);
+
+  // Indicates whether the task is complete.
+  private final AtomicBoolean taskComplete = new AtomicBoolean(false);
+
+  // Separate flag from firstException, since an error can be reported without an exception.
+  private final AtomicBoolean errorSeen = new AtomicBoolean(false);
+
+  private volatile EndReason firstEndReason = null;
+
+  // The first exception which caused the task to fail. This could come in from the
+  // TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the context.
+  private volatile Throwable firstException;
+  private volatile EventMetaData exceptionSourceInfo;
+  private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false);
+
+  private boolean oobSignalErrorInProgress = false;
+  private final Lock oobSignalLock = new ReentrantLock();
+  private final Condition oobSignalCondition = oobSignalLock.newCondition();
+
+  private volatile long taskKillStartTime  = 0;
+
+  // The callable which is being used to execute the task.
+  private volatile TaskRunner2Callable taskRunnerCallable;
+
+  public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+                        TaskSpec taskSpec, int appAttemptNumber,
+                        Map<String, ByteBuffer> serviceConsumerMetadata,
+                        Map<String, String> serviceProviderEnvMap,
+                        Multimap<String, String> startedInputsMap,
+                        TaskReporterInterface taskReporter, ListeningExecutorService executor,
+                        ObjectRegistry objectRegistry, String pid,
+                        ExecutionContext executionContext, long memAvailable) throws
+      IOException {
+    this.ugi = ugi;
+    this.taskReporter = taskReporter;
+    this.executor = executor;
+    this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler();
+    this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs,
+        umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
+        objectRegistry, pid, executionContext, memAvailable);
+  }
+
+  /**
+   * Throws an exception only when there was a communication error reported by
+   * the TaskReporter.
+   *
+   * Otherwise, this takes care of all communication with the AM for a a running task - which
+   * includes informing the AM about Failures and Success.
+   *
+   * If a kill request is made to the task, it will not communicate this information to
+   * the AM - since a task KILL is an external event, and whoever invoked it should
+   * be able to track it.
+   *
+   * @return
+   */
+  public TaskRunner2Result run() {
+    try {
+      ListenableFuture<TaskRunner2CallableResult> future = null;
+      synchronized (this) {
+        if (isRunningState()) {
+          // Safe to do this within a synchronized block because we're providing
+          // the handler on which the Reporter will communicate back. Assuming
+          // the register call doesn't end up hanging.
+          taskRunnerCallable = new TaskRunner2Callable(task, ugi);
+          taskReporter.registerTask(task, umbilicalAndErrorHandler);
+          future = executor.submit(taskRunnerCallable);
+        }
+      }
+
+      if (future == null) {
+        return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+      }
+
+      TaskRunner2CallableResult executionResult = null;
+      // The task started. Wait for it to complete.
+      try {
+        executionResult = future.get();
+      } catch (Throwable e) {
+        if (e instanceof ExecutionException) {
+          e = e.getCause();
+        }
+        synchronized (this) {
+          if (isRunningState()) {
+            trySettingEndReason(EndReason.TASK_ERROR);
+            registerFirstException(e, null);
+            LOG.warn("Exception from RunnerCallable", e);
+          }
+        }
+      }
+      if (executionResult != null) {
+        synchronized (this) {
+          if (isRunningState()) {
+            if (executionResult.error != null) {
+              trySettingEndReason(EndReason.TASK_ERROR);
+              registerFirstException(executionResult.error, null);
+            } else {
+              trySettingEndReason(EndReason.SUCCESS);
+              taskComplete.set(true);
+            }
+          }
+        }
+      }
+
+      switch (firstEndReason) {
+        case SUCCESS:
+          try {
+            taskReporter.taskSucceeded(task.getTaskAttemptID());
+            return logAndReturnEndResult(EndReason.SUCCESS, null, stopContainerRequested.get());
+          } catch (IOException e) {
+            // Comm failure. Task can't do much.
+            handleFinalStatusUpdateFailure(e, true);
+            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get());
+          } catch (TezException e) {
+            // Failure from AM. Task can't do much.
+            handleFinalStatusUpdateFailure(e, true);
+            return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get());
+          }
+        case CONTAINER_STOP_REQUESTED:
+          // Don't need to send any more communication updates to the AM.
+          return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get());
+        case KILL_REQUESTED:
+          // Kill is currently not reported to the AM via the TaskRunner. Fix this when the umbilical
+          // supports an indication of kill, if required.
+          return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get());
+        case COMMUNICATION_FAILURE:
+          // Already seen a communication failure. There's no point trying to report another one.
+          return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+        case TASK_ERROR:
+          // Don't report an error again if it was reported via signalFatalError
+          if (errorReporterToAm.get()) {
+            return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+          } else {
+            String message;
+            if (firstException instanceof FSError) {
+              message = "Encountered an FSError while executing task: " + task.getTaskAttemptID();
+            } else if (firstException instanceof Error) {
+              message = "Encountered an Error while executing task: " + task.getTaskAttemptID();
+            } else {
+              message = "Failure while running task: " + task.getTaskAttemptID();
+            }
+            try {
+              taskReporter.taskFailed(task.getTaskAttemptID(), firstException, message, exceptionSourceInfo);
+              return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+            } catch (IOException e) {
+              // Comm failure. Task can't do much.
+              handleFinalStatusUpdateFailure(e, true);
+              return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+            } catch (TezException e) {
+              // Failure from AM. Task can't do much.
+              handleFinalStatusUpdateFailure(e, true);
+              return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+            }
+          }
+        default:
+          LOG.error("Unexpected EndReason. File a bug");
+          return logAndReturnEndResult(EndReason.TASK_ERROR, new RuntimeException("Unexpected EndReason"), stopContainerRequested.get());
+
+      }
+    } finally {
+      // Clear the interrupted status of the blocking thread, in case it is set after the
+      // InterruptedException was invoked.
+      oobSignalLock.lock();
+      try {
+        while (oobSignalErrorInProgress) {
+          try {
+            oobSignalCondition.await();
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for OOB fatal error to complete");
+            Thread.currentThread().interrupt();
+          }
+        }
+      } finally {
+        oobSignalLock.unlock();
+      }
+      taskReporter.unregisterTask(task.getTaskAttemptID());
+      if (taskKillStartTime != 0) {
+        LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime));
+      }
+      Thread.interrupted();
+    }
+  }
+
+  public void killTask() {
+    synchronized (this) {
+      if (isRunningState()) {
+        trySettingEndReason(EndReason.KILL_REQUESTED);
+        if (taskRunnerCallable != null) {
+          taskKillStartTime = System.currentTimeMillis();
+          taskRunnerCallable.interruptTask();
+        }
+      }
+    }
+  }
+
+
+  // Checks and changes on these states should happen within a synchronized block,
+  // to ensure the first event is the one that is captured and causes specific behaviour.
+  private boolean isRunningState() {
+    return !taskComplete.get() && !killTaskRequested.get() && !stopContainerRequested.get() &&
+        !errorSeen.get();
+  }
+
+  class UmbilicalAndErrorHandler implements TezUmbilical, ErrorReporter {
+
+    @Override
+    public void addEvents(Collection<TezEvent> events) {
+      // Incoming events from the running task.
+      // Only add these if the task is running.
+      if (isRunningState()) {
+        taskReporter.addEvents(task.getTaskAttemptID(), events);
+      }
+    }
+
+    @Override
+    public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, String message,
+                                 EventMetaData sourceInfo) {
+      // Fatal error reported by the task.
+      boolean isFirstError = false;
+      synchronized (TezTaskRunner2.this) {
+        if (isRunningState()) {
+          if (trySettingEndReason(EndReason.TASK_ERROR)) {
+            if (t == null) {
+              t = new RuntimeException(
+                  message == null ? "FatalError: No user message or exception specified" : message);
+            }
+            registerFirstException(t, sourceInfo);
+            LOG.info("Received notification of a fatal error which will cause the task to die", t);
+            isFirstError = true;
+            errorReporterToAm.set(true);
+            oobSignalErrorInProgress = true;
+          } else {
+            LOG.info(
+                "Ignoring fatal error since the task has ended for reason: {}. IgnoredError: {} ",
+                firstEndReason, (t == null ? message : t.getMessage()));
+          }
+        }
+      }
+
+      // Informing the TaskReporter here because the running task may not be interruptable.
+      // Has to be outside the lock.
+      if (isFirstError) {
+        killTask();
+        try {
+          taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
+        } catch (IOException e) {
+          // Comm failure. Task can't do much. The main exception is already registered.
+          handleFinalStatusUpdateFailure(e, true);
+        } catch (TezException e) {
+          // Failure from AM. Task can't do much. The main exception is already registered.
+          handleFinalStatusUpdateFailure(e, true);
+        } finally {
+          oobSignalLock.lock();
+          try {
+            // This message is being sent outside of the main thread, which may end up completing before
+            // this thread runs. Make sure the main run thread does not end till this completes.
+            oobSignalErrorInProgress = false;
+            oobSignalCondition.signal();
+          } finally {
+            oobSignalLock.unlock();
+          }
+        }
+      }
+    }
+
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+      // Task checking whether it can commit.
+
+      // Not getting a lock here. It should be alright for the to check with the reporter
+      // on whether a task can commit.
+      if (isRunningState()) {
+        return taskReporter.canCommit(taskAttemptID);
+        // If there's a communication failure here, let it propagate through to the task.
+        // which may throw it back or handle it appropriately.
+      } else {
+        // Don't throw an error since the task is already in the process of shutting down.
+        LOG.info("returning canCommit=false since task is not in a running state");
+        return false;
+      }
+    }
+
+
+    @Override
+    public void reportError(Throwable t) {
+      // Umbilical reporting an error during heartbeat
+      boolean isFirstError = false;
+      synchronized (TezTaskRunner2.this) {
+        if (isRunningState()) {
+          LOG.info("TaskReporter reporter error which will cause the task to fail", t);
+          if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) {
+            registerFirstException(t, null);
+            isFirstError = true;
+          }
+          // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
+          // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
+          // method does not throw an exception, in which case task success is registered with the AM.
+          // Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter
+        } else {
+          LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
+              + " is already complete, is failing or has been asked to terminate");
+        }
+      }
+      // Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
+      if (isFirstError) {
+        killTask();
+      }
+    }
+
+    @Override
+    public void shutdownRequested() {
+      // Umbilical informing about a shutdown request for the container.
+      boolean isFirstTerminate = false;
+      synchronized (TezTaskRunner2.this) {
+        isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
+        // Respect stopContainerRequested since it can come in at any point, despite a previous failure.
+        stopContainerRequested.set(true);
+      }
+
+      if (isFirstTerminate) {
+        killTask();
+      }
+    }
+  }
+
+  private synchronized boolean trySettingEndReason(EndReason endReason) {
+    if (isRunningState()) {
+      firstEndReason = endReason;
+      return true;
+    }
+    return false;
+  }
+
+
+  private void registerFirstException(Throwable t, EventMetaData sourceInfo) {
+    Preconditions.checkState(isRunningState());
+    errorSeen.set(true);
+    firstException = t;
+    this.exceptionSourceInfo = sourceInfo;
+  }
+
+
+  private String getTaskDiagnosticsString(Throwable t, String message) {
+    String diagnostics;
+    if (t != null && message != null) {
+      diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
+          + message;
+    } else if (t == null && message == null) {
+      diagnostics = "Unknown error";
+    } else {
+      diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
+          : " errorMessage=" + message;
+    }
+    return diagnostics;
+  }
+
+  private TaskRunner2Result logAndReturnEndResult(EndReason endReason, Throwable firstError,
+                                                  boolean stopContainerRequested) {
+    TaskRunner2Result result = new TaskRunner2Result(endReason, firstError, stopContainerRequested);
+    LOG.info("TaskRunnerResult for {} : {}  ", task.getTaskAttemptID(), result);
+    return result;
+  }
+
+  private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted) {
+    // TODO Ideally differentiate between FAILED/KILLED
+    LOG.warn("Failure while reporting state= {} to AM", (successReportAttempted ? "success" : "failure/killed"), t);
+  }
+}
\ No newline at end of file