You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:39 UTC
[21/50] [abbrv] tez git commit: TEZ-2434. Allow tasks to be killed in
the runtime. (sseth)
TEZ-2434. Allow tasks to be killed in the runtime. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73f73167
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73f73167
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73f73167
Branch: refs/heads/TEZ-2003
Commit: 73f73167d4085b137672dbc9ac8c3e4a0c126a8d
Parents: 34f053f
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon May 11 23:34:43 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:44 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/runtime/task/EndReason.java | 29 ++
.../tez/runtime/task/TaskRunner2Callable.java | 132 ++++++
.../tez/runtime/task/TaskRunner2Result.java | 48 ++
.../org/apache/tez/runtime/task/TezChild.java | 20 +-
.../apache/tez/runtime/task/TezTaskRunner.java | 1 +
.../apache/tez/runtime/task/TezTaskRunner2.java | 434 +++++++++++++++++++
7 files changed, 655 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/73f73167/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 66c110f..5d2e40a 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -24,5 +24,6 @@ ALL CHANGES:
TEZ-2420. TaskRunner returning before executing the task.
TEZ-2433. Fixes after rebase 05/08
TEZ-2438. tez-tools version in the branch is incorrect.
+ TEZ-2434. Allow tasks to be killed in the Runtime.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/73f73167/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
new file mode 100644
index 0000000..8dc7a87
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/EndReason.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+public enum EndReason {
+ SUCCESS(false),
+ CONTAINER_STOP_REQUESTED(false),
+ KILL_REQUESTED(true),
+ COMMUNICATION_FAILURE(false),
+ TASK_ERROR(false);
+
+ private final boolean isActionable;
+
+ EndReason(boolean isActionable) {
+ this.isActionable = isActionable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/73f73167/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
new file mode 100644
index 0000000..7315bbd
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for running a {@link LogicalIOProcessorRuntimeTask}.
+ * It does not worry about reporting errors, heartbeats etc.
+ *
+ * Returns success / interrupt / failure status via it's return parameter.
+ *
+ * It's the responsibility of the invoker to handle whatever exceptions may be generated by this.
+ */
+public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.TaskRunner2CallableResult> {
+
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskRunner2Callable.class);
+
+ private final LogicalIOProcessorRuntimeTask task;
+ private final UserGroupInformation ugi;
+ private final AtomicBoolean stopRequested = new AtomicBoolean(false);
+
+ private volatile Thread ownThread;
+
+ public TaskRunner2Callable(LogicalIOProcessorRuntimeTask task,
+ UserGroupInformation ugi) {
+ this.task = task;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public TaskRunner2CallableResult callInternal() throws Exception {
+ ownThread = Thread.currentThread();
+ if (stopRequested.get()) {
+ return new TaskRunner2CallableResult(null);
+ }
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<TaskRunner2CallableResult>() {
+ @Override
+ public TaskRunner2CallableResult run() throws Exception {
+ if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
+ return new TaskRunner2CallableResult(null);
+ }
+ LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+ task.initialize();
+
+ if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
+ LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+ task.run();
+ } else {
+ LOG.info("Stopped before running the processor.");
+ return new TaskRunner2CallableResult(null);
+ }
+
+ if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
+ LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+ task.close();
+ task.setFrameworkCounters();
+ } else {
+ LOG.info("Stopped before closing the processor");
+ return new TaskRunner2CallableResult(null);
+ }
+ LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop=" + stopRequested.get());
+
+
+ return new TaskRunner2CallableResult(null);
+ }
+ });
+ } catch (Throwable t) {
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
+ return new TaskRunner2CallableResult(t);
+ } finally {
+ // If a stop was requested. Make sure the interrupt status is set during the cleanup.
+
+ // One drawback of not communicating out from here is that task complete messages will only
+ // be sent out after cleanup is complete.
+ // For a successful task, however, this should be almost no delay since close has already happened.
+ maybeFixInterruptStatus();
+ LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get());
+ task.cleanup();
+ }
+ }
+
+ private void maybeFixInterruptStatus() {
+ if (stopRequested.get() && !Thread.currentThread().isInterrupted()) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+ public void interruptTask() {
+ // Ensure the task is only interrupted once.
+ if (!stopRequested.getAndSet(true)) {
+ if (ownThread != null) {
+ ownThread.interrupt();
+ }
+ }
+ }
+
+ public static class TaskRunner2CallableResult {
+ final Throwable error;
+
+ public TaskRunner2CallableResult(Throwable error) {
+ this.error = error;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/73f73167/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
new file mode 100644
index 0000000..07b32ce
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Result.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+public class TaskRunner2Result {
+ final EndReason endReason;
+ final Throwable error;
+ final boolean containerShutdownRequested;
+
+ public TaskRunner2Result(EndReason endReason, Throwable error, boolean containerShutdownRequested) {
+ this.endReason = endReason;
+ this.error = error;
+ this.containerShutdownRequested = containerShutdownRequested;
+ }
+
+ public EndReason getEndReason() {
+ return endReason;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public boolean isContainerShutdownRequested() {
+ return containerShutdownRequested;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskRunner2Result{" +
+ "endReason=" + endReason +
+ ", error=" + error +
+ ", containerShutdownRequested=" + containerShutdownRequested +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/73f73167/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 4c8bebc..fff39a0 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -248,27 +248,27 @@ public class TezChild {
cleanupOnTaskChanged(containerTask);
// Execute the Actual Task
- TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
+ TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
localDirs, containerTask.getTaskSpec(), appAttemptNumber,
serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
executor, objectRegistry, pid, executionContext, memAvailable);
boolean shouldDie;
try {
- shouldDie = !taskRunner.run();
+ TaskRunner2Result result = taskRunner.run();
+ shouldDie = result.isContainerShutdownRequested();
if (shouldDie) {
LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString);
shutdown();
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
"Asked to die by the AM");
}
- } catch (IOException e) {
- handleError(e);
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
- e, "TaskExecutionFailure: " + e.getMessage());
- } catch (TezException e) {
- handleError(e);
- return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
- e, "TaskExecutionFailure: " + e.getMessage());
+ if (result.getError() != null) {
+ Throwable e = result.getError();
+ handleError(result.getError());
+ return new ContainerExecutionResult(
+ ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+ e, "TaskExecutionFailure: " + e.getMessage());
+ }
} finally {
FileSystem.closeAllForUGI(childUGI);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/73f73167/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index dd4620a..a82d87b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -106,6 +106,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
if (!Thread.currentThread().isInterrupted()) {
taskFuture = executor.submit(callable);
} else {
+ taskReporter.unregisterTask(task.getTaskAttemptID());
return isShutdownRequested();
}
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/73f73167/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
new file mode 100644
index 0000000..73e5c76
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
+import org.apache.tez.runtime.task.TaskRunner2Callable.TaskRunner2CallableResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TezTaskRunner2 {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class);
+
+ private final LogicalIOProcessorRuntimeTask task;
+ private final UserGroupInformation ugi;
+
+ private final TaskReporterInterface taskReporter;
+ private final ListeningExecutorService executor;
+ private final UmbilicalAndErrorHandler umbilicalAndErrorHandler;
+
+ // TODO It may be easier to model this as a state machine.
+
+ // Indicates whether a kill has been requested.
+ private final AtomicBoolean killTaskRequested = new AtomicBoolean(false);
+
+ // Indicates whether a stop container has been requested.
+ private final AtomicBoolean stopContainerRequested = new AtomicBoolean(false);
+
+ // Indicates whether the task is complete.
+ private final AtomicBoolean taskComplete = new AtomicBoolean(false);
+
+ // Separate flag from firstException, since an error can be reported without an exception.
+ private final AtomicBoolean errorSeen = new AtomicBoolean(false);
+
+ private volatile EndReason firstEndReason = null;
+
+ // The first exception which caused the task to fail. This could come in from the
+ // TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the context.
+ private volatile Throwable firstException;
+ private volatile EventMetaData exceptionSourceInfo;
+ private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false);
+
+ private boolean oobSignalErrorInProgress = false;
+ private final Lock oobSignalLock = new ReentrantLock();
+ private final Condition oobSignalCondition = oobSignalLock.newCondition();
+
+ private volatile long taskKillStartTime = 0;
+
+ // The callable which is being used to execute the task.
+ private volatile TaskRunner2Callable taskRunnerCallable;
+
+ public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+ TaskSpec taskSpec, int appAttemptNumber,
+ Map<String, ByteBuffer> serviceConsumerMetadata,
+ Map<String, String> serviceProviderEnvMap,
+ Multimap<String, String> startedInputsMap,
+ TaskReporterInterface taskReporter, ListeningExecutorService executor,
+ ObjectRegistry objectRegistry, String pid,
+ ExecutionContext executionContext, long memAvailable) throws
+ IOException {
+ this.ugi = ugi;
+ this.taskReporter = taskReporter;
+ this.executor = executor;
+ this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler();
+ this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs,
+ umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
+ objectRegistry, pid, executionContext, memAvailable);
+ }
+
+ /**
+ * Throws an exception only when there was a communication error reported by
+ * the TaskReporter.
+ *
+ * Otherwise, this takes care of all communication with the AM for a a running task - which
+ * includes informing the AM about Failures and Success.
+ *
+ * If a kill request is made to the task, it will not communicate this information to
+ * the AM - since a task KILL is an external event, and whoever invoked it should
+ * be able to track it.
+ *
+ * @return
+ */
+ public TaskRunner2Result run() {
+ try {
+ ListenableFuture<TaskRunner2CallableResult> future = null;
+ synchronized (this) {
+ if (isRunningState()) {
+ // Safe to do this within a synchronized block because we're providing
+ // the handler on which the Reporter will communicate back. Assuming
+ // the register call doesn't end up hanging.
+ taskRunnerCallable = new TaskRunner2Callable(task, ugi);
+ taskReporter.registerTask(task, umbilicalAndErrorHandler);
+ future = executor.submit(taskRunnerCallable);
+ }
+ }
+
+ if (future == null) {
+ return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+ }
+
+ TaskRunner2CallableResult executionResult = null;
+ // The task started. Wait for it to complete.
+ try {
+ executionResult = future.get();
+ } catch (Throwable e) {
+ if (e instanceof ExecutionException) {
+ e = e.getCause();
+ }
+ synchronized (this) {
+ if (isRunningState()) {
+ trySettingEndReason(EndReason.TASK_ERROR);
+ registerFirstException(e, null);
+ LOG.warn("Exception from RunnerCallable", e);
+ }
+ }
+ }
+ if (executionResult != null) {
+ synchronized (this) {
+ if (isRunningState()) {
+ if (executionResult.error != null) {
+ trySettingEndReason(EndReason.TASK_ERROR);
+ registerFirstException(executionResult.error, null);
+ } else {
+ trySettingEndReason(EndReason.SUCCESS);
+ taskComplete.set(true);
+ }
+ }
+ }
+ }
+
+ switch (firstEndReason) {
+ case SUCCESS:
+ try {
+ taskReporter.taskSucceeded(task.getTaskAttemptID());
+ return logAndReturnEndResult(EndReason.SUCCESS, null, stopContainerRequested.get());
+ } catch (IOException e) {
+ // Comm failure. Task can't do much.
+ handleFinalStatusUpdateFailure(e, true);
+ return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get());
+ } catch (TezException e) {
+ // Failure from AM. Task can't do much.
+ handleFinalStatusUpdateFailure(e, true);
+ return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get());
+ }
+ case CONTAINER_STOP_REQUESTED:
+ // Don't need to send any more communication updates to the AM.
+ return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get());
+ case KILL_REQUESTED:
+ // Kill is currently not reported to the AM via the TaskRunner. Fix this when the umbilical
+ // supports an indication of kill, if required.
+ return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get());
+ case COMMUNICATION_FAILURE:
+ // Already seen a communication failure. There's no point trying to report another one.
+ return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+ case TASK_ERROR:
+ // Don't report an error again if it was reported via signalFatalError
+ if (errorReporterToAm.get()) {
+ return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+ } else {
+ String message;
+ if (firstException instanceof FSError) {
+ message = "Encountered an FSError while executing task: " + task.getTaskAttemptID();
+ } else if (firstException instanceof Error) {
+ message = "Encountered an Error while executing task: " + task.getTaskAttemptID();
+ } else {
+ message = "Failure while running task: " + task.getTaskAttemptID();
+ }
+ try {
+ taskReporter.taskFailed(task.getTaskAttemptID(), firstException, message, exceptionSourceInfo);
+ return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+ } catch (IOException e) {
+ // Comm failure. Task can't do much.
+ handleFinalStatusUpdateFailure(e, true);
+ return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+ } catch (TezException e) {
+ // Failure from AM. Task can't do much.
+ handleFinalStatusUpdateFailure(e, true);
+ return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get());
+ }
+ }
+ default:
+ LOG.error("Unexpected EndReason. File a bug");
+ return logAndReturnEndResult(EndReason.TASK_ERROR, new RuntimeException("Unexpected EndReason"), stopContainerRequested.get());
+
+ }
+ } finally {
+ // Clear the interrupted status of the blocking thread, in case it is set after the
+ // InterruptedException was invoked.
+ oobSignalLock.lock();
+ try {
+ while (oobSignalErrorInProgress) {
+ try {
+ oobSignalCondition.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for OOB fatal error to complete");
+ Thread.currentThread().interrupt();
+ }
+ }
+ } finally {
+ oobSignalLock.unlock();
+ }
+ taskReporter.unregisterTask(task.getTaskAttemptID());
+ if (taskKillStartTime != 0) {
+ LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime));
+ }
+ Thread.interrupted();
+ }
+ }
+
+ public void killTask() {
+ synchronized (this) {
+ if (isRunningState()) {
+ trySettingEndReason(EndReason.KILL_REQUESTED);
+ if (taskRunnerCallable != null) {
+ taskKillStartTime = System.currentTimeMillis();
+ taskRunnerCallable.interruptTask();
+ }
+ }
+ }
+ }
+
+
+ // Checks and changes on these states should happen within a synchronized block,
+ // to ensure the first event is the one that is captured and causes specific behaviour.
+ private boolean isRunningState() {
+ return !taskComplete.get() && !killTaskRequested.get() && !stopContainerRequested.get() &&
+ !errorSeen.get();
+ }
+
+ class UmbilicalAndErrorHandler implements TezUmbilical, ErrorReporter {
+
+ @Override
+ public void addEvents(Collection<TezEvent> events) {
+ // Incoming events from the running task.
+ // Only add these if the task is running.
+ if (isRunningState()) {
+ taskReporter.addEvents(task.getTaskAttemptID(), events);
+ }
+ }
+
+ @Override
+ public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, String message,
+ EventMetaData sourceInfo) {
+ // Fatal error reported by the task.
+ boolean isFirstError = false;
+ synchronized (TezTaskRunner2.this) {
+ if (isRunningState()) {
+ if (trySettingEndReason(EndReason.TASK_ERROR)) {
+ if (t == null) {
+ t = new RuntimeException(
+ message == null ? "FatalError: No user message or exception specified" : message);
+ }
+ registerFirstException(t, sourceInfo);
+ LOG.info("Received notification of a fatal error which will cause the task to die", t);
+ isFirstError = true;
+ errorReporterToAm.set(true);
+ oobSignalErrorInProgress = true;
+ } else {
+ LOG.info(
+ "Ignoring fatal error since the task has ended for reason: {}. IgnoredError: {} ",
+ firstEndReason, (t == null ? message : t.getMessage()));
+ }
+ }
+ }
+
+ // Informing the TaskReporter here because the running task may not be interruptable.
+ // Has to be outside the lock.
+ if (isFirstError) {
+ killTask();
+ try {
+ taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
+ } catch (IOException e) {
+ // Comm failure. Task can't do much. The main exception is already registered.
+ handleFinalStatusUpdateFailure(e, true);
+ } catch (TezException e) {
+ // Failure from AM. Task can't do much. The main exception is already registered.
+ handleFinalStatusUpdateFailure(e, true);
+ } finally {
+ oobSignalLock.lock();
+ try {
+ // This message is being sent outside of the main thread, which may end up completing before
+ // this thread runs. Make sure the main run thread does not end till this completes.
+ oobSignalErrorInProgress = false;
+ oobSignalCondition.signal();
+ } finally {
+ oobSignalLock.unlock();
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+ // Task checking whether it can commit.
+
+ // Not getting a lock here. It should be alright for the to check with the reporter
+ // on whether a task can commit.
+ if (isRunningState()) {
+ return taskReporter.canCommit(taskAttemptID);
+ // If there's a communication failure here, let it propagate through to the task.
+ // which may throw it back or handle it appropriately.
+ } else {
+ // Don't throw an error since the task is already in the process of shutting down.
+ LOG.info("returning canCommit=false since task is not in a running state");
+ return false;
+ }
+ }
+
+
+ @Override
+ public void reportError(Throwable t) {
+ // Umbilical reporting an error during heartbeat
+ boolean isFirstError = false;
+ synchronized (TezTaskRunner2.this) {
+ if (isRunningState()) {
+ LOG.info("TaskReporter reporter error which will cause the task to fail", t);
+ if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) {
+ registerFirstException(t, null);
+ isFirstError = true;
+ }
+ // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
+ // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
+ // method does not throw an exception, in which case task success is registered with the AM.
+ // Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter
+ } else {
+ LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
+ + " is already complete, is failing or has been asked to terminate");
+ }
+ }
+ // Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
+ if (isFirstError) {
+ killTask();
+ }
+ }
+
+ @Override
+ public void shutdownRequested() {
+ // Umbilical informing about a shutdown request for the container.
+ boolean isFirstTerminate = false;
+ synchronized (TezTaskRunner2.this) {
+ isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
+ // Respect stopContainerRequested since it can come in at any point, despite a previous failure.
+ stopContainerRequested.set(true);
+ }
+
+ if (isFirstTerminate) {
+ killTask();
+ }
+ }
+ }
+
+ private synchronized boolean trySettingEndReason(EndReason endReason) {
+ if (isRunningState()) {
+ firstEndReason = endReason;
+ return true;
+ }
+ return false;
+ }
+
+
+ private void registerFirstException(Throwable t, EventMetaData sourceInfo) {
+ Preconditions.checkState(isRunningState());
+ errorSeen.set(true);
+ firstException = t;
+ this.exceptionSourceInfo = sourceInfo;
+ }
+
+
+ private String getTaskDiagnosticsString(Throwable t, String message) {
+ String diagnostics;
+ if (t != null && message != null) {
+ diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
+ + message;
+ } else if (t == null && message == null) {
+ diagnostics = "Unknown error";
+ } else {
+ diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
+ : " errorMessage=" + message;
+ }
+ return diagnostics;
+ }
+
+ private TaskRunner2Result logAndReturnEndResult(EndReason endReason, Throwable firstError,
+ boolean stopContainerRequested) {
+ TaskRunner2Result result = new TaskRunner2Result(endReason, firstError, stopContainerRequested);
+ LOG.info("TaskRunnerResult for {} : {} ", task.getTaskAttemptID(), result);
+ return result;
+ }
+
+ private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted) {
+ // TODO Ideally differentiate between FAILED/KILLED
+ LOG.warn("Failure while reporting state= {} to AM", (successReportAttempted ? "success" : "failure/killed"), t);
+ }
+}
\ No newline at end of file