You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:07 UTC
[29/43] tez git commit: TEZ-2187. Allow TaskCommunicators to report
failed / killed attempts. (sseth)
TEZ-2187. Allow TaskCommunicators to report failed / killed attempts. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e1ab1914
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e1ab1914
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e1ab1914
Branch: refs/heads/TEZ-2003
Commit: e1ab191494658b5470d12d4b25b016530b28d398
Parents: 99a1b85
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Mar 10 01:25:39 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:30 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/common/TezUtilsInternal.java | 60 +++++++++
.../tez/dag/api/TaskAttemptEndReason.java | 24 ++++
.../records/TaskAttemptTerminationCause.java | 7 +-
.../apache/tez/dag/api/TaskCommunicator.java | 2 +
.../tez/dag/api/TaskCommunicatorContext.java | 13 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 33 +++++
.../event/TaskAttemptEventAttemptFailed.java | 2 +
.../event/TaskAttemptEventAttemptKilled.java | 47 +++++++
.../dag/app/dag/event/TaskAttemptEventType.java | 5 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 33 ++++-
.../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 9 +-
.../dag/app/rm/LocalTaskSchedulerService.java | 3 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 7 +-
.../tez/dag/app/rm/TaskSchedulerService.java | 6 +-
.../dag/app/rm/YarnTaskSchedulerService.java | 8 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 1 +
.../app/TestTaskAttemptListenerImplTezDag2.java | 126 +++++++++++++++++++
.../tez/dag/app/rm/TestContainerReuse.java | 65 +++++-----
.../app/rm/TestLocalTaskSchedulerService.java | 5 +-
.../tez/dag/app/rm/TestTaskScheduler.java | 18 +--
.../rm/TezTestServiceTaskSchedulerService.java | 3 +-
.../TezTestServiceTaskCommunicatorImpl.java | 36 +++++-
.../org/apache/tez/service/ContainerRunner.java | 5 +-
.../tez/service/MiniTezTestServiceCluster.java | 5 +-
.../tez/service/impl/ContainerRunnerImpl.java | 60 +++++++--
.../apache/tez/service/impl/TezTestService.java | 6 +-
.../impl/TezTestServiceProtocolServerImpl.java | 10 +-
.../tez/tests/TestExternalTezServices.java | 29 +++++
29 files changed, 548 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 7726815..774a685 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -10,5 +10,6 @@ ALL CHANGES:
TEZ-2138. Fix minor bugs in adding default scheduler, getting launchers.
TEZ-2139. Update tez version to 0.7.0-TEZ-2003-SNAPSHOT.
TEZ-2175. Task priority should be available to the TaskCommunicator plugin.
+ TEZ-2187. Allow TaskCommunicators to report failed / killed attempts.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 9c78377..347a4f6 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Appender;
import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Stopwatch;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@Private
public class TezUtilsInternal {
@@ -234,4 +236,62 @@ public class TezUtilsInternal {
return sb.toString();
}
+ public static TaskAttemptTerminationCause fromTaskAttemptEndReason(
+ TaskAttemptEndReason taskAttemptEndReason) {
+ if (taskAttemptEndReason == null) {
+ return null;
+ }
+ switch (taskAttemptEndReason) {
+ case COMMUNICATION_ERROR:
+ return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
+ case SERVICE_BUSY:
+ return TaskAttemptTerminationCause.SERVICE_BUSY;
+ case INTERRUPTED_BY_SYSTEM:
+ return TaskAttemptTerminationCause.INTERRUPTED_BY_SYSTEM;
+ case INTERRUPTED_BY_USER:
+ return TaskAttemptTerminationCause.INTERRUPTED_BY_USER;
+ case OTHER:
+ return TaskAttemptTerminationCause.UNKNOWN_ERROR;
+ default:
+ return TaskAttemptTerminationCause.UNKNOWN_ERROR;
+ }
+ }
+
+ public static TaskAttemptEndReason toTaskAttemptEndReason(TaskAttemptTerminationCause cause) {
+ // TODO Post TEZ-2003. Consolidate these states, and mappings.
+ if (cause == null) {
+ return null;
+ }
+ switch (cause) {
+ case COMMUNICATION_ERROR:
+ return TaskAttemptEndReason.COMMUNICATION_ERROR;
+ case SERVICE_BUSY:
+ return TaskAttemptEndReason.SERVICE_BUSY;
+ case INTERRUPTED_BY_SYSTEM:
+ return TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM;
+ case INTERRUPTED_BY_USER:
+ return TaskAttemptEndReason.INTERRUPTED_BY_USER;
+ case UNKNOWN_ERROR:
+ case TERMINATED_BY_CLIENT:
+ case TERMINATED_AT_SHUTDOWN:
+ case INTERNAL_PREEMPTION:
+ case EXTERNAL_PREEMPTION:
+ case TERMINATED_INEFFECTIVE_SPECULATION:
+ case TERMINATED_EFFECTIVE_SPECULATION:
+ case TERMINATED_ORPHANED:
+ case APPLICATION_ERROR:
+ case FRAMEWORK_ERROR:
+ case INPUT_READ_ERROR:
+ case OUTPUT_WRITE_ERROR:
+ case OUTPUT_LOST:
+ case TASK_HEARTBEAT_ERROR:
+ case CONTAINER_LAUNCH_FAILED:
+ case CONTAINER_EXITED:
+ case CONTAINER_STOPPED:
+ case NODE_FAILED:
+ case NODE_DISK_ERROR:
+ default:
+ return TaskAttemptEndReason.OTHER;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
new file mode 100644
index 0000000..96a4768
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+// TODO TEZ-2003 Expose as a public API
+public enum TaskAttemptEndReason {
+ COMMUNICATION_ERROR,
+ SERVICE_BUSY,
+ INTERRUPTED_BY_SYSTEM,
+ INTERRUPTED_BY_USER,
+ OTHER
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
index ef0bb33..7112d9e 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -41,5 +41,10 @@ public enum TaskAttemptTerminationCause {
CONTAINER_STOPPED, // Container stopped or released by Tez
NODE_FAILED, // Node for the container failed
NODE_DISK_ERROR, // Disk failed on the node runnign the task
-
+
+ COMMUNICATION_ERROR, // Equivalent to a launch failure
+ SERVICE_BUSY, // Service rejected the task
+ INTERRUPTED_BY_SYSTEM, // Interrupted by the system. e.g. Pre-emption
+ INTERRUPTED_BY_USER, // Interrupted by the user
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 82eed20..945091e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -45,6 +45,8 @@ public abstract class TaskCommunicator extends AbstractService {
Credentials credentials,
boolean credentialsChanged, int priority);
+ // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
+
// TODO TEZ-2003 Remove reference to TaskAttemptID
public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 41675fe..a85fb7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.api;
+import javax.annotation.Nullable;
import java.io.IOException;
import org.apache.hadoop.security.Credentials;
@@ -37,15 +38,21 @@ public interface TaskCommunicatorContext {
// TODO TEZ-2003 Move to vertex, taskIndex, version
boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
+ // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update
TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
boolean isKnownContainer(ContainerId containerId);
- // TODO TEZ-2003 Move to vertex, taskIndex, version
+ // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
- // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure.
- // This will have to take into consideration the TA_FAILED event
+ // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
+ void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
+
+ // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
+ void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
+
+ // TODO TEZ-2003 API. Should a method exist for task succeeded.
// TODO Eventually Add methods to report availability stats to the scheduler.
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index b570301..94f6cae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -37,14 +37,17 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
@@ -54,7 +57,10 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -257,6 +263,33 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
pingContainerHeartbeatHandler(containerId);
}
+ @Override
+ public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ String diagnostics) {
+ // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+ // and messages from the scheduler will release the container.
+ // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore,
+ // instead of waiting for the unregister to flow through the Container.
+ // Fix along the same lines as TEZ-2124 by introducing an explict context.
+ context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
+ diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+ taskAttemptEndReason)));
+ }
+
+ @Override
+ public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ String diagnostics) {
+ // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+ // and messages from the scheduler will release the container.
+ // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore,
+ // instead of waiting for the unregister to flow through the Container.
+ // Fix along the same lines as TEZ-2124 by introducing an explict context.
+ context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
+ TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+ taskAttemptEndReason)));
+ }
+
+
/**
* Child checking whether it can commit.
* <p/>
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index b9c1d09..7ec8921 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -26,6 +26,8 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
private final String diagnostics;
private final TaskAttemptTerminationCause errorCause;
+
+ /* Accepted Types - FAILED, TIMED_OUT */
public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) {
super(id, type);
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
new file mode 100644
index 0000000..72e6b07
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
+
+ private final String diagnostics;
+ private final TaskAttemptTerminationCause errorCause;
+ public TaskAttemptEventAttemptKilled(TezTaskAttemptID id,
+ String diagnostics,
+ TaskAttemptTerminationCause errorCause) {
+ super(id, TaskAttemptEventType.TA_KILLED);
+ this.diagnostics = diagnostics;
+ this.errorCause = errorCause;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
+ }
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index b7aca36..6d20368 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -26,14 +26,15 @@ public enum TaskAttemptEventType {
//Producer:Task, Speculator
TA_SCHEDULE,
-//Producer: TaskAttemptListener
+//Producer: TaskAttemptListener | Vertex after routing events
TA_STARTED_REMOTELY,
TA_STATUS_UPDATE,
TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED
TA_DONE,
TA_FAILED,
+ TA_KILLED, // Generated by TaskCommunicators
TA_TIMED_OUT,
-
+
//Producer: Client, Scheduler, On speculation.
TA_KILL_REQUEST,
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index c80571d..11d4df9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -185,6 +186,11 @@ public class TaskAttemptImpl implements TaskAttempt,
private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
+ // TODO TEZ-2003 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before
+ // TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating
+ // with the listening service and getting a response, which in turn can trigger STATUS_UPDATES / FAILED / KILLED
+
+ // TA_KILLED handled the same as TA_KILL_REQUEST. Just a different name indicating a request / already killed.
private static StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory
@@ -225,6 +231,10 @@ public class TaskAttemptImpl implements TaskAttempt,
new TerminatedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_KILLED,
+ new TerminatedBeforeRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_NODE_FAILED,
new NodeFailedBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT,
@@ -265,6 +275,10 @@ public class TaskAttemptImpl implements TaskAttempt,
new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_KILLED,
+ new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_NODE_FAILED,
new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
@@ -303,6 +317,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_KILLED,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_OUTPUT_FAILED))
@@ -324,6 +339,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_KILLED,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_OUTPUT_FAILED))
@@ -342,6 +358,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_KILLED,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -361,6 +378,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_KILLED,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -383,6 +401,12 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_KILLED,
+ new TerminatedAfterSuccessTransition())
+ .addTransition(
+ TaskAttemptStateInternal.SUCCEEDED,
+ EnumSet.of(TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_NODE_FAILED,
new TerminatedAfterSuccessTransition())
.addTransition(
@@ -434,7 +458,6 @@ public class TaskAttemptImpl implements TaskAttempt,
this.leafVertex = leafVertex;
}
-
@Override
public TezTaskAttemptID getID() {
return attemptId;
@@ -1030,6 +1053,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// Compute node/rack location request even if re-scheduled.
Set<String> racks = new HashSet<String>();
+ // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define localicty for different attempts.
TaskLocationHint locationHint = ta.getTaskLocationHint();
if (locationHint != null) {
if (locationHint.getRacks() != null) {
@@ -1104,6 +1128,8 @@ public class TaskAttemptImpl implements TaskAttempt,
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
+ // This transition should not be invoked directly, if a scheduler event has already been sent out.
+ // Sub-classes should be used if a scheduler request has been sent.
ta.setFinishTime();
if (event instanceof DiagnosableEvent) {
@@ -1218,7 +1244,8 @@ public class TaskAttemptImpl implements TaskAttempt,
// Inform the scheduler
if (sendSchedulerEvent()) {
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
- .getTaskAttemptState(), ta.getVertex().getTaskSchedulerIdentifier()));
+ .getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
+ ta.getVertex().getTaskSchedulerIdentifier()));
}
}
}
@@ -1300,7 +1327,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// Inform the Scheduler.
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
- TaskAttemptState.SUCCEEDED, ta.getVertex().getTaskSchedulerIdentifier()));
+ TaskAttemptState.SUCCEEDED, null, ta.getVertex().getTaskSchedulerIdentifier()));
// Inform the task.
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index 2ace642..a775948 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.rm;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -27,14 +28,16 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
private final TaskAttempt attempt;
private final ContainerId containerId;
private final TaskAttemptState state;
+ private final TaskAttemptEndReason taskAttemptEndReason;
private final int schedulerId;
public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
- TaskAttemptState state, int schedulerId) {
+ TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, int schedulerId) {
super(AMSchedulerEventType.S_TA_ENDED);
this.attempt = attempt;
this.containerId = containerId;
this.state = state;
+ this.taskAttemptEndReason = taskAttemptEndReason;
this.schedulerId = schedulerId;
}
@@ -57,4 +60,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
public int getSchedulerId() {
return schedulerId;
}
+
+ public TaskAttemptEndReason getTaskAttemptEndReason() {
+ return taskAttemptEndReason;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 72a074f..a234e07 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
@@ -149,7 +150,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
return taskRequestHandler.addDeallocateTaskRequest(task);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 8e5fc71..9f09f68 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -289,7 +289,9 @@ public class TaskSchedulerEventHandler extends AbstractService
private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
TaskAttempt attempt = event.getAttempt();
- boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false);
+ // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
+ boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
+ .deallocateTask(attempt, false, event.getTaskAttemptEndReason());
// use stored value of container id in case the scheduler has removed this
// assignment because the task has been deallocated earlier.
// retroactive case
@@ -311,6 +313,7 @@ public class TaskSchedulerEventHandler extends AbstractService
sendEvent(new AMContainerEventStopRequest(attemptContainerId));
// Inform the Node - the task has asked to be STOPPED / has already
// stopped.
+ // AMNodeImpl blacklisting logic does not account for KILLED attempts.
sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,
attempt.getID(), event.getState() == TaskAttemptState.FAILED));
@@ -332,7 +335,7 @@ public class TaskSchedulerEventHandler extends AbstractService
}
boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
- true);
+ true, null);
if (!wasContainerAllocated) {
LOG.error("De-allocated successful task: " + attempt.getID()
+ ", but TaskScheduler reported no container assigned to task");
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 48d5455..07dfcd6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
public abstract class TaskSchedulerService extends AbstractService{
@@ -61,8 +62,9 @@ public abstract class TaskSchedulerService extends AbstractService{
public abstract void allocateTask(Object task, Resource capability,
ContainerId containerId, Priority priority, Object containerSignature,
Object clientCookie);
-
- public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
+
+ /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */
+ public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason);
public abstract Object deallocateContainer(ContainerId containerId);
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 44f5484..1fc9ac2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
@@ -987,10 +988,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
* the task to de-allocate.
* @param taskSucceeded
* specify whether the task succeeded or failed.
+ * @param endReason
+ * reason for the task ending
* @return true if a container is assigned to this task.
*/
@Override
- public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ public boolean deallocateTask(Object task, boolean taskSucceeded,
+ TaskAttemptEndReason endReason) {
Map<CookieContainerRequest, Container> assignedContainers = null;
synchronized (this) {
@@ -1180,7 +1184,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
CookieContainerRequest request = entry.getValue();
if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
LOG.info("Resending request for task again: " + task);
- deallocateTask(task, true);
+ deallocateTask(task, true, null);
allocateTask(task, request.getCapability(),
(request.getNodes() == null ? null :
request.getNodes().toArray(new String[request.getNodes().size()])),
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 0cf1959..076f9e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -76,6 +76,7 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
@SuppressWarnings("unchecked")
+// TODO TEZ-2003 Rename to TestTezTaskCommunicator
public class TestTaskAttemptListenerImplTezDag {
private ApplicationId appId;
private AppContext appContext;
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
new file mode 100644
index 0000000..934543f
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+// TODO TEZ-2003. Rename to TestTaskAttemptListener | whatever TaskAttemptListener is renamed to.
+public class TestTaskAttemptListenerImplTezDag2 {
+
+ @Test(timeout = 5000)
+ public void testTaskAttemptFailedKilled() {
+ ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ Credentials credentials = new Credentials();
+ AppContext appContext = mock(AppContext.class);
+ EventHandler eventHandler = mock(EventHandler.class);
+ DAG dag = mock(DAG.class);
+ AMContainerMap amContainerMap = mock(AMContainerMap.class);
+ Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+ doReturn(eventHandler).when(appContext).getEventHandler();
+ doReturn(dag).when(appContext).getCurrentDAG();
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+ doReturn(credentials).when(appContext).getAppCredentials();
+ doReturn(appAcls).when(appContext).getApplicationACLs();
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+ AMContainer amContainer = mock(AMContainer.class);
+ Container container = mock(Container.class);
+ doReturn(nodeId).when(container).getNodeId();
+ doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+ doReturn(container).when(amContainer).getContainer();
+
+ TaskAttemptListenerImpTezDag taskAttemptListener =
+ new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
+ mock(ContainerHeartbeatHandler.class), null, null, false);
+
+ TaskSpec taskSpec1 = mock(TaskSpec.class);
+ TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
+ doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID();
+ AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
+
+ TaskSpec taskSpec2 = mock(TaskSpec.class);
+ TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
+ doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
+ AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
+
+ ContainerId containerId1 = createContainerId(appId, 1);
+ taskAttemptListener.registerRunningContainer(containerId1, 0);
+ taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0);
+ ContainerId containerId2 = createContainerId(appId, 2);
+ taskAttemptListener.registerRunningContainer(containerId2, 0);
+ taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0);
+
+
+ taskAttemptListener
+ .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
+ taskAttemptListener
+ .taskKilled(taskAttemptId2, TaskAttemptEndReason.SERVICE_BUSY, "Diagnostics2");
+
+ ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(argumentCaptor.capture());
+ assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
+ assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled);
+ TaskAttemptEventAttemptFailed failedEvent =
+ (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
+ TaskAttemptEventAttemptKilled killedEvent =
+ (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1);
+
+ assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo());
+ assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR,
+ failedEvent.getTerminationCause());
+
+ assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo());
+ assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause());
+ // TODO TEZ-2003. Verify unregistration from the registered list
+ }
+
+ private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
+ return containerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index c1169ef..d45346a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -57,6 +58,7 @@ import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -92,6 +94,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.internal.matchers.Null;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -223,9 +226,9 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(
- ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, 0));
+ ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(1)).taskAllocated(
eq(ta31), any(Object.class), eq(containerHost1));
verify(rmClient, times(0)).releaseAssignedContainer(
@@ -235,7 +238,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
- TaskAttemptState.SUCCEEDED, 0));
+ TaskAttemptState.SUCCEEDED, null, 0));
long currentTs = System.currentTimeMillis();
Throwable exception = null;
@@ -356,9 +359,9 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
- TaskAttemptState.SUCCEEDED, 0));
+ TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
eq(ta31), any(Object.class), eq(containerHost2));
verify(rmClient, times(1)).releaseAssignedContainer(
@@ -459,9 +462,9 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -469,19 +472,19 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta12), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
// Verify no re-use if a previous task fails.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0));
drainableAppCallback.drain();
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1));
- verify(taskScheduler).deallocateTask(eq(ta13), eq(false));
+ verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null));
verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -496,9 +499,9 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
// Task assigned to container completed successfully. No pending requests. Container should be released.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta14), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta14), eq(true), eq((TaskAttemptEndReason)null));
verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -607,9 +610,9 @@ public class TestContainerReuse {
// First task had profiling on. This container can not be reused further.
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+ new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class),
eq(container1));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
@@ -653,9 +656,9 @@ public class TestContainerReuse {
// Verify that the container can not be reused when profiling option is turned on
// Even for 2 tasks having same profiling option can have container reusability.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta13), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class),
eq(container2));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
@@ -698,9 +701,9 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
//Ensure task 6 (of vertex 1) is allocated to same container
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta15), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
eventHandler.reset();
@@ -811,9 +814,9 @@ public class TestContainerReuse {
// until delay expires.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(),
- TaskAttemptState.SUCCEEDED, 0));
+ TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -828,7 +831,7 @@ public class TestContainerReuse {
// TA12 completed.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta12, container1.getId(),
- TaskAttemptState.SUCCEEDED, 0));
+ TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(3000l);
@@ -946,9 +949,9 @@ public class TestContainerReuse {
// Container should be assigned to task21.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(),
- TaskAttemptState.SUCCEEDED, 0));
+ TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler).taskAllocated(
eq(ta21), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -956,7 +959,7 @@ public class TestContainerReuse {
// Task 2 completes.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, container1.getId(),
- TaskAttemptState.SUCCEEDED, 0));
+ TaskAttemptState.SUCCEEDED, null, 0));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1065,9 +1068,9 @@ public class TestContainerReuse {
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta111), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1077,9 +1080,9 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta112), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -1118,9 +1121,9 @@ public class TestContainerReuse {
assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
eventHandler.reset();
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta211), eq(true));
+ verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 25cf4b5..0a642bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
@@ -94,7 +95,7 @@ public class TestLocalTaskSchedulerService {
Task task = mock(Task.class);
taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
- taskSchedulerService.deallocateTask(task, false);
+ taskSchedulerService.deallocateTask(task, false, null);
// start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first
taskSchedulerService.startRequestHandlerThread();
@@ -126,7 +127,7 @@ public class TestLocalTaskSchedulerService {
MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
requestHandler.drainRequest(1);
- taskSchedulerService.deallocateTask(task, false);
+ taskSchedulerService.deallocateTask(task, false, null);
requestHandler.drainRequest(2);
assertEquals(1, requestHandler.deallocateCount);
assertEquals(1, requestHandler.allocateCount);
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index dabae67..807e772 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -172,7 +172,7 @@ public class TestTaskScheduler {
addContainerRequest((CookieContainerRequest) any());
// returned from task requests before allocation happens
- assertFalse(scheduler.deallocateTask(mockTask1, true));
+ assertFalse(scheduler.deallocateTask(mockTask1, true, null));
verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
verify(mockRMClient, times(1)).
removeContainerRequest((CookieContainerRequest) any());
@@ -180,7 +180,7 @@ public class TestTaskScheduler {
releaseAssignedContainer((ContainerId) any());
// deallocating unknown task
- assertFalse(scheduler.deallocateTask(mockTask1, true));
+ assertFalse(scheduler.deallocateTask(mockTask1, true, null));
verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
verify(mockRMClient, times(1)).
removeContainerRequest((CookieContainerRequest) any());
@@ -325,7 +325,7 @@ public class TestTaskScheduler {
verify(mockRMClient).releaseAssignedContainer(mockCId4);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask1, true));
+ assertTrue(scheduler.deallocateTask(mockTask1, true, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId1);
verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -445,7 +445,7 @@ public class TestTaskScheduler {
verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask4, true));
+ assertTrue(scheduler.deallocateTask(mockTask4, true, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId6);
verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -475,7 +475,7 @@ public class TestTaskScheduler {
removeContainerRequest((CookieContainerRequest) any());
verify(mockRMClient, times(8)).addContainerRequest(
(CookieContainerRequest) any());
- assertFalse(scheduler.deallocateTask(mockTask1, true));
+ assertFalse(scheduler.deallocateTask(mockTask1, true, null));
List<NodeReport> mockUpdatedNodes = mock(List.class);
scheduler.onNodesUpdated(mockUpdatedNodes);
@@ -741,7 +741,7 @@ public class TestTaskScheduler {
verify(mockRMClient).releaseAssignedContainer(mockCId4);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask1, true));
+ assertTrue(scheduler.deallocateTask(mockTask1, true, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId1);
verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -871,7 +871,7 @@ public class TestTaskScheduler {
verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask4, true));
+ assertTrue(scheduler.deallocateTask(mockTask4, true, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId6);
verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -960,8 +960,8 @@ public class TestTaskScheduler {
// container7 allocated to the task with affinity for it
verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask5, true));
- assertTrue(scheduler.deallocateTask(mockTask6, true));
+ assertTrue(scheduler.deallocateTask(mockTask5, true, null));
+ assertTrue(scheduler.deallocateTask(mockTask6, true, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId7);
verify(mockApp).containerBeingReleased(mockCId8);
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 5657f86..872d592 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.service.TezTestServiceConfConstants;
@@ -198,7 +199,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
ContainerId containerId = runningTasks.remove(task);
if (containerId == null) {
LOG.error("Could not determine ContainerId for task: " + task +
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index a327caf..e3385a2 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -19,16 +19,20 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.TezTestServiceCommunicator;
@@ -83,6 +87,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
@Override
public void serviceStop() {
super.serviceStop();
+ this.communicator.stop();
}
@@ -123,13 +128,15 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
throw new RuntimeException("ContainerInfo not found for container: " + containerId +
", while trying to launch task: " + taskSpec.getTaskAttemptID());
}
+ // Have to register this up front right now. Otherwise, it's possible for the task to start
+ // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
+ getTaskCommunicatorContext()
+ .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
communicator.submitWork(requestProto, host, port,
new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@Override
public void setResponse(SubmitWorkResponseProto response) {
LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
- getTaskCommunicatorContext()
- .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
}
@Override
@@ -137,6 +144,31 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
// TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in.
LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId, t);
+ if (t instanceof ServiceException) {
+ ServiceException se = (ServiceException) t;
+ t = se.getCause();
+ }
+ if (t instanceof RemoteException) {
+ RemoteException re = (RemoteException)t;
+ String message = re.toString();
+ if (message.contains(RejectedExecutionException.class.getName())) {
+ getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+ TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
+ } else {
+ getTaskCommunicatorContext()
+ .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
+ t.toString());
+ }
+ } else {
+ if (t instanceof IOException) {
+ getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+ TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
+ } else {
+ getTaskCommunicatorContext()
+ .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
+ t.getMessage());
+ }
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
index 2bca4ed..28c2286 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
@@ -16,12 +16,13 @@ package org.apache.tez.service;
import java.io.IOException;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
public interface ContainerRunner {
- void queueContainer(RunContainerRequestProto request) throws IOException;
- void submitWork(SubmitWorkRequestProto request) throws IOException;
+ void queueContainer(RunContainerRequestProto request) throws TezException;
+ void submitWork(SubmitWorkRequestProto request) throws TezException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
index f47bd67..0ac0b33 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
@@ -133,7 +133,10 @@ public class MiniTezTestServiceCluster extends AbstractService {
@Override
public void serviceStop() {
- tezTestService.stop();
+ if (tezTestService != null) {
+ tezTestService.stop();
+ tezTestService = null;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index 25d6030..379d952 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,6 +59,7 @@ import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.task.TaskReporter;
import org.apache.tez.runtime.task.TezTaskRunner;
import org.apache.tez.service.ContainerRunner;
@@ -68,14 +70,18 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.task.TezChild;
import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
import org.apache.tez.shufflehandler.ShuffleHandler;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
import org.apache.tez.util.ProtoConverters;
public class ContainerRunnerImpl extends AbstractService implements ContainerRunner {
private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class);
+ public static final String DAG_NAME_INSTRUMENTED_FAILURES = "InstrumentedFailures";
+
private final ListeningExecutorService executorService;
private final AtomicReference<InetSocketAddress> localAddress;
private final String[] localDirsBase;
@@ -146,10 +152,10 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
* Submit a container which is ready for running.
* The regular pull mechanism will be used to fetch work from the AM
* @param request
- * @throws IOException
+ * @throws TezException
*/
@Override
- public void queueContainer(RunContainerRequestProto request) throws IOException {
+ public void queueContainer(RunContainerRequestProto request) throws TezException {
LOG.info("Queuing container for execution: " + request);
Map<String, String> env = new HashMap<String, String>();
@@ -162,7 +168,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
for (int i = 0; i < localDirsBase.length; i++) {
localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
request.getUser());
- localFs.mkdirs(new Path(localDirs[i]));
+ try {
+ localFs.mkdirs(new Path(localDirs[i]));
+ } catch (IOException e) {
+ throw new TezException(e);
+ }
}
LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs));
@@ -175,7 +185,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
DataInputBuffer dib = new DataInputBuffer();
byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
dib.reset(tokenBytes, tokenBytes.length);
- credentials.readTokenStorageStream(dib);
+ try {
+ credentials.readTokenStorageStream(dib);
+ } catch (IOException e) {
+ throw new TezException(e);
+ }
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
@@ -197,13 +211,14 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
* This is intended for a task push from the AM
*
* @param request
- * @throws IOException
+ * @throws org.apache.tez.dag.api.TezException
*/
@Override
- public void submitWork(SubmitWorkRequestProto request) throws
- IOException {
+ public void submitWork(SubmitWorkRequestProto request) throws TezException {
LOG.info("Queuing work for execution: " + request);
+ checkAndThrowExceptionForTests(request);
+
Map<String, String> env = new HashMap<String, String>();
env.putAll(localEnv);
env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
@@ -214,7 +229,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
for (int i = 0; i < localDirsBase.length; i++) {
localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
request.getUser());
- localFs.mkdirs(new Path(localDirs[i]));
+ try {
+ localFs.mkdirs(new Path(localDirs[i]));
+ } catch (IOException e) {
+ throw new TezException(e);
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Dirs are: " + Arrays.toString(localDirs));
@@ -228,7 +247,11 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
DataInputBuffer dib = new DataInputBuffer();
byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
dib.reset(tokenBytes, tokenBytes.length);
- credentials.readTokenStorageStream(dib);
+ try {
+ credentials.readTokenStorageStream(dib);
+ } catch (IOException e) {
+ throw new TezException(e);
+ }
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
@@ -509,4 +532,23 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
}
}
+
+ private void checkAndThrowExceptionForTests(SubmitWorkRequestProto request) throws TezException {
+ if (!request.getTaskSpec().getDagName().equals(DAG_NAME_INSTRUMENTED_FAILURES)) {
+ return;
+ }
+
+ TaskSpec taskSpec = ProtoConverters.getTaskSpecfromProto(request.getTaskSpec());
+ if (taskSpec.getTaskAttemptID().getTaskID().getId() == 0 &&
+ taskSpec.getTaskAttemptID().getId() == 0) {
+ LOG.info("Simulating Rejected work");
+ throw new RejectedExecutionException(
+ "Simulating Rejected work for taskAttemptId=" + taskSpec.getTaskAttemptID());
+ } else if (taskSpec.getTaskAttemptID().getTaskID().getId() == 1 &&
+ taskSpec.getTaskAttemptID().getId() == 0) {
+ LOG.info("Simulating Task Setup Failure during launch");
+ throw new TezException("Simulating Task Setup Failure during launch for taskAttemptId=" +
+ taskSpec.getTaskAttemptID());
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
index 012e352..855f1b0 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -14,7 +14,6 @@
package org.apache.tez.service.impl;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
@@ -25,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.service.ContainerRunner;
import org.apache.tez.shufflehandler.ShuffleHandler;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
@@ -108,14 +108,14 @@ public class TezTestService extends AbstractService implements ContainerRunner {
@Override
- public void queueContainer(RunContainerRequestProto request) throws IOException {
+ public void queueContainer(RunContainerRequestProto request) throws TezException {
numSubmissions.incrementAndGet();
containerRunner.queueContainer(request);
}
@Override
public void submitWork(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
- IOException {
+ TezException {
numSubmissions.incrementAndGet();
containerRunner.submitWork(request);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
index d7f8444..39d7156 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
@@ -30,11 +30,13 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.service.ContainerRunner;
import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
public class TezTestServiceProtocolServerImpl extends AbstractService
@@ -61,20 +63,20 @@ public class TezTestServiceProtocolServerImpl extends AbstractService
LOG.info("Received request: " + request);
try {
containerRunner.queueContainer(request);
- } catch (IOException e) {
+ } catch (TezException e) {
throw new ServiceException(e);
}
return RunContainerResponseProto.getDefaultInstance();
}
@Override
- public SubmitWorkResponseProto submitWork(RpcController controller, TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+ public SubmitWorkResponseProto submitWork(RpcController controller, SubmitWorkRequestProto request) throws
ServiceException {
LOG.info("Received submitWork request: " + request);
try {
containerRunner.submitWork(request);
- } catch (IOException e) {
- e.printStackTrace();
+ } catch (TezException e) {
+ throw new ServiceException(e);
}
return SubmitWorkResponseProto.getDefaultInstance();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e1ab1914/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 0ec972b..b6a166d 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -27,16 +27,23 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidateConfigured;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.service.MiniTezTestServiceCluster;
+import org.apache.tez.service.impl.ContainerRunnerImpl;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -283,6 +290,28 @@ public class TestExternalTezServices {
PROPS_IN_AM, PROPS_REGULAR_CONTAINERS);
}
+ @Test(timeout = 60000)
+ public void testErrorPropagation() throws TezException, InterruptedException, IOException {
+ runExceptionSimulation();
+ }
+
+
+
+ private void runExceptionSimulation() throws IOException, TezException, InterruptedException {
+ DAG dag = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES);
+ Vertex v =Vertex.create("Vertex1", ProcessorDescriptor.create(SleepProcessor.class.getName()),
+ 3);
+ for (Map.Entry<String, String> prop : PROPS_EXT_SERVICE_PUSH.entrySet()) {
+ v.setConf(prop.getKey(), prop.getValue());
+ }
+ dag.addVertex(v);
+ DAGClient dagClient = sharedTezClient.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.waitForCompletion();
+ assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ assertEquals(1, dagStatus.getDAGProgress().getFailedTaskAttemptCount());
+ assertEquals(1, dagStatus.getDAGProgress().getKilledTaskAttemptCount());
+
+ }
private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
Map<String, String> rhsProps,