You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/12 20:00:08 UTC
git commit: TEZ-831. Reduce line length of state machines (bikas)
Updated Branches:
refs/heads/master fea8ffa8d -> 54e232211
TEZ-831. Reduce line length of state machines (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/54e23221
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/54e23221
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/54e23221
Branch: refs/heads/master
Commit: 54e232211b0bc11889c87bfddb484bda1f60295f
Parents: fea8ffa
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 12 10:59:53 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 12 10:59:53 2014 -0800
----------------------------------------------------------------------
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 346 +++++++++++++++----
.../dag/app/rm/container/AMContainerImpl.java | 240 ++++++++++---
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 117 +++++--
3 files changed, 553 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/54e23221/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 02cbe38..d555e04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -177,70 +177,288 @@ public class TaskAttemptImpl implements TaskAttempt,
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
(TaskAttemptStateInternal.NEW)
- .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
- .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminateTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminateTransition(KILLED_HELPER))
-
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedBeforeRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedBeforeRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
-
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, new OutputConsumableTransition()) //Optional, may not come in for all tasks.
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.RUNNING, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
-
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE) // Stuck RPC. The client retries in a loop.
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition())
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
- // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this. Something needs to go out to the job.
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.OUTPUT_CONSUMABLE), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
-
- .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
- .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
- .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
- .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
- .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_SCHEDULE, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
- .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_SCHEDULE, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
-
- // How will duplicate history events be handled ?
- // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition())
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition())
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED))
+ .addTransition(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptEventType.TA_SCHEDULE, new ScheduleTaskattemptTransition())
+ .addTransition(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.NEW,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ new TerminateTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ new TerminateTransition(KILLED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.RUNNING,
+ TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ new TerminatedBeforeRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ new TerminatedBeforeRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ new NodeFailedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ new ContainerTerminatingBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+ new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
+
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.RUNNING,
+ TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.RUNNING,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+ new OutputConsumableTransition())
+ // Optional, may not come in for all tasks.
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
+ new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_FAILED,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_TIMED_OUT,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ new ContainerCompletedWhileRunningTransition())
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+ new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(
+ TaskAttemptStateInternal.RUNNING,
+ EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptStateInternal.RUNNING),
+ TaskAttemptEventType.TA_OUTPUT_FAILED,
+ new OutputReportedFailedTransition())
+
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_OUTPUT_CONSUMABLE)
+ // Stuck RPC. The client retries in a loop.
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
+ new SucceededTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_FAILED,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_TIMED_OUT,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this.
+ // Something needs to go out to the job.
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ new TerminatedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+ new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
+ .addTransition(
+ TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
+ EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptStateInternal.OUTPUT_CONSUMABLE),
+ TaskAttemptEventType.TA_OUTPUT_FAILED,
+ new OutputReportedFailedTransition())
+
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ new ContainerCompletedWhileTerminating())
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+ TaskAttemptEventType.TA_STATUS_UPDATE,
+ TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+ TaskAttemptEventType.TA_TIMED_OUT,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ new ContainerCompletedWhileTerminating())
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+ EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+ TaskAttemptEventType.TA_STATUS_UPDATE,
+ TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+ TaskAttemptEventType.TA_TIMED_OUT,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+ .addTransition(TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.KILLED,
+ EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+ TaskAttemptEventType.TA_SCHEDULE,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+ TaskAttemptEventType.TA_STATUS_UPDATE,
+ TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+ TaskAttemptEventType.TA_TIMED_OUT,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+ .addTransition(TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.FAILED,
+ EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
+ TaskAttemptEventType.TA_SCHEDULE,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+ TaskAttemptEventType.TA_STATUS_UPDATE,
+ TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
+ TaskAttemptEventType.TA_TIMED_OUT,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ TaskAttemptEventType.TA_NODE_FAILED,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ TaskAttemptEventType.TA_OUTPUT_FAILED))
+
+ // How will duplicate history events be handled ?
+ // TODO Maybe consider not failing REDUCE tasks in this case. Also,
+ // MAP_TASKS in case there's only one phase in the job.
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ .addTransition(
+ TaskAttemptStateInternal.SUCCEEDED,
+ EnumSet.of(TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_KILL_REQUEST,
+ new TerminatedAfterSuccessTransition())
+ .addTransition(
+ TaskAttemptStateInternal.SUCCEEDED,
+ EnumSet.of(TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_NODE_FAILED,
+ new TerminatedAfterSuccessTransition())
+ .addTransition(
+ TaskAttemptStateInternal.SUCCEEDED,
+ EnumSet.of(TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_OUTPUT_FAILED,
+ new OutputReportedFailedTransition())
+ .addTransition(
+ TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptStateInternal.SUCCEEDED,
+ EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT,
+ TaskAttemptEventType.TA_FAIL_REQUEST,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED))
.installTopology();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/54e23221/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 95cb69d..1ce0d89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -140,61 +140,191 @@ public class AMContainerImpl implements AMContainer {
new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
AMContainerState.ALLOCATED)
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtAllocatedTransition())
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtAllocatedTransition())
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtAllocatedTransition())
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtAllocatedTransition())
- .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
-
- .addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
- // TODO CREUSE : Maybe, consider sending back an attempt if the container asks for one in this state. Waiting for a LAUNCHED event from the NMComm may delay the task allocation.
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtLaunchingTransition())
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtLaunchingTransition())
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtLaunchingTransition())
- .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorAtLaunchingTransition())
-
- .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtIdleTransition())
- .addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
- .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
-
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
- .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
- .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
-
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, new NMStopRequestFailedTransition())
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
- .addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
-
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
- // TODO This transition is wrong. Should be a noop / error.
- .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
- .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
-
- .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
- .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
- .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
- .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING,
+ AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+ AMContainerEventType.C_ASSIGN_TA,
+ new AssignTaskAttemptAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+ AMContainerEventType.C_COMPLETED,
+ new CompletedAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+ AMContainerEventType.C_STOP_REQUEST,
+ new StopRequestAtAllocatedTransition())
+ .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED,
+ AMContainerEventType.C_NODE_FAILED,
+ new NodeFailedAtAllocatedTransition())
+ .addTransition(
+ AMContainerState.ALLOCATED,
+ AMContainerState.COMPLETED,
+ EnumSet.of(AMContainerEventType.C_LAUNCHED,
+ AMContainerEventType.C_LAUNCH_FAILED,
+ AMContainerEventType.C_PULL_TA,
+ AMContainerEventType.C_TA_SUCCEEDED,
+ AMContainerEventType.C_NM_STOP_SENT,
+ AMContainerEventType.C_NM_STOP_FAILED,
+ AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
+
+ .addTransition(
+ AMContainerState.LAUNCHING,
+ EnumSet.of(AMContainerState.LAUNCHING,
+ AMContainerState.STOP_REQUESTED),
+ AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE,
+ AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING,
+ AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
+ // TODO CREUSE : Maybe, consider sending back an attempt if the container
+ // asks for one in this state. Waiting for a LAUNCHED event from the
+ // NMComm may delay the task allocation.
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING,
+ AMContainerEventType.C_PULL_TA)
+ // Is assuming the pullAttempt will be null.
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED,
+ AMContainerEventType.C_COMPLETED,
+ new CompletedAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING,
+ AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST,
+ new StopRequestAtLaunchingTransition())
+ .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING,
+ AMContainerEventType.C_NODE_FAILED,
+ new NodeFailedAtLaunchingTransition())
+ .addTransition(
+ AMContainerState.LAUNCHING,
+ AMContainerState.STOP_REQUESTED,
+ EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+ AMContainerEventType.C_TA_SUCCEEDED,
+ AMContainerEventType.C_NM_STOP_SENT,
+ AMContainerEventType.C_NM_STOP_FAILED,
+ AMContainerEventType.C_TIMED_OUT),
+ new ErrorAtLaunchingTransition())
+
+ .addTransition(AMContainerState.IDLE,
+ EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED),
+ AMContainerEventType.C_ASSIGN_TA,
+ new AssignTaskAttemptAtIdleTransition())
+ .addTransition(AMContainerState.IDLE,
+ EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE),
+ AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED,
+ AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED,
+ AMContainerEventType.C_STOP_REQUEST,
+ new StopRequestAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED,
+ AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
+ .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING,
+ AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
+ .addTransition(
+ AMContainerState.IDLE,
+ AMContainerState.STOP_REQUESTED,
+ EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+ AMContainerEventType.C_LAUNCHED,
+ AMContainerEventType.C_LAUNCH_FAILED,
+ AMContainerEventType.C_TA_SUCCEEDED,
+ AMContainerEventType.C_NM_STOP_SENT,
+ AMContainerEventType.C_NM_STOP_FAILED),
+ new ErrorAtIdleTransition())
+
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED,
+ AMContainerEventType.C_ASSIGN_TA,
+ new AssignTaskAttemptAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING,
+ AMContainerEventType.C_PULL_TA)
+ .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE,
+ AMContainerEventType.C_TA_SUCCEEDED,
+ new TASucceededAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED,
+ AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED,
+ AMContainerEventType.C_STOP_REQUEST,
+ new StopRequestAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED,
+ AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
+ .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING,
+ AMContainerEventType.C_NODE_FAILED,
+ new NodeFailedAtRunningTransition())
+ .addTransition(
+ AMContainerState.RUNNING,
+ AMContainerState.STOP_REQUESTED,
+ EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+ AMContainerEventType.C_LAUNCHED,
+ AMContainerEventType.C_LAUNCH_FAILED,
+ AMContainerEventType.C_NM_STOP_SENT,
+ AMContainerEventType.C_NM_STOP_FAILED),
+ new ErrorAtRunningTransition())
+
+ .addTransition(AMContainerState.STOP_REQUESTED,
+ AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA,
+ new AssignTAAtWindDownTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED,
+ AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA,
+ new PullTAAfterStopTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED,
+ AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED,
+ new CompletedAtWindDownTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED,
+ AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
+ .addTransition(AMContainerState.STOP_REQUESTED,
+ AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED,
+ new NMStopRequestFailedTransition())
+ .addTransition(AMContainerState.STOP_REQUESTED,
+ AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED,
+ new NodeFailedAtNMStopRequestedTransition())
+ .addTransition(
+ AMContainerState.STOP_REQUESTED,
+ AMContainerState.STOP_REQUESTED,
+ EnumSet.of(AMContainerEventType.C_LAUNCHED,
+ AMContainerEventType.C_LAUNCH_FAILED,
+ AMContainerEventType.C_TA_SUCCEEDED,
+ AMContainerEventType.C_STOP_REQUEST,
+ AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOP_REQUESTED,
+ AMContainerState.STOP_REQUESTED,
+ AMContainerEventType.C_LAUNCH_REQUEST,
+ new ErrorAtNMStopRequestedTransition())
+
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+ AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+ AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+ // TODO This transition is wrong. Should be a noop / error.
+ .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED,
+ AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+ AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+ .addTransition(
+ AMContainerState.STOPPING,
+ AMContainerState.STOPPING,
+ EnumSet.of(AMContainerEventType.C_LAUNCHED,
+ AMContainerEventType.C_LAUNCH_FAILED,
+ AMContainerEventType.C_TA_SUCCEEDED,
+ AMContainerEventType.C_STOP_REQUEST,
+ AMContainerEventType.C_NM_STOP_SENT,
+ AMContainerEventType.C_NM_STOP_FAILED,
+ AMContainerEventType.C_TIMED_OUT))
+ .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING,
+ AMContainerEventType.C_LAUNCH_REQUEST,
+ new ErrorAtStoppingTransition())
+
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED,
+ AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED,
+ AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
+ .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED,
+ AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
+ .addTransition(
+ AMContainerState.COMPLETED,
+ AMContainerState.COMPLETED,
+ EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST,
+ AMContainerEventType.C_LAUNCHED,
+ AMContainerEventType.C_LAUNCH_FAILED,
+ AMContainerEventType.C_TA_SUCCEEDED,
+ AMContainerEventType.C_COMPLETED,
+ AMContainerEventType.C_STOP_REQUEST,
+ AMContainerEventType.C_NM_STOP_SENT,
+ AMContainerEventType.C_NM_STOP_FAILED,
+ AMContainerEventType.C_TIMED_OUT))
.installTopology();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/54e23221/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index a79358b..9ee8a3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -74,37 +74,92 @@ public class AMNodeImpl implements AMNode {
new StateMachineFactory<AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>(
AMNodeState.ACTIVE)
// Transitions from ACTIVE state.
- .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
- .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
- .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_TA_ENDED, new TaskAttemptFailedTransition())
- .addTransition(AMNodeState.ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
- .addTransition(AMNodeState.ACTIVE, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
- .addTransition(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
- .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TURNED_HEALTHY)
-
- // Transitions from BLACKLISTED state.
- .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileBlacklistedTransition())
- .addTransition(AMNodeState.BLACKLISTED, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededWhileBlacklistedTransition())
- .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
- .addTransition(AMNodeState.BLACKLISTED, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
- .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
- .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED), new GenericErrorTransition())
-
- //Transitions from FORCED_ACTIVE state.
- .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedTransition())
- .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
- .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
- .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new NodeTurnedUnhealthyTransition())
- .addTransition(AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition())
- .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED), new GenericErrorTransition())
-
- // Transitions from UNHEALTHY state.
- .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_CONTAINER_ALLOCATED, new ContainerAllocatedWhileUnhealthyTransition())
- .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, EnumSet.of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
- .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingStateChangeTransition(false))
- .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true))
- .addTransition(AMNodeState.UNHEALTHY, EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE), AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
- .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+ AMNodeEventType.N_CONTAINER_ALLOCATED,
+ new ContainerAllocatedTransition())
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+ AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+ .addTransition(AMNodeState.ACTIVE,
+ EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED),
+ AMNodeEventType.N_TA_ENDED, new TaskAttemptFailedTransition())
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_TURNED_UNHEALTHY,
+ new NodeTurnedUnhealthyTransition())
+ .addTransition(AMNodeState.ACTIVE,
+ EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED),
+ AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
+ new IgnoreBlacklistingDisabledTransition())
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE,
+ AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
+ new IgnoreBlacklistingStateChangeTransition(true))
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+ AMNodeEventType.N_TURNED_HEALTHY)
+
+ // Transitions from BLACKLISTED state.
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
+ AMNodeEventType.N_CONTAINER_ALLOCATED,
+ new ContainerAllocatedWhileBlacklistedTransition())
+ .addTransition(AMNodeState.BLACKLISTED,
+ EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
+ AMNodeEventType.N_TA_SUCCEEDED,
+ new TaskAttemptSucceededWhileBlacklistedTransition())
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
+ AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_TURNED_UNHEALTHY,
+ new NodeTurnedUnhealthyTransition())
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE,
+ AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
+ new IgnoreBlacklistingStateChangeTransition(true))
+ .addTransition(
+ AMNodeState.BLACKLISTED,
+ AMNodeState.BLACKLISTED,
+ EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY,
+ AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED),
+ new GenericErrorTransition())
+
+ // Transitions from FORCED_ACTIVE state.
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+ AMNodeEventType.N_CONTAINER_ALLOCATED,
+ new ContainerAllocatedTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+ AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+ AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_TURNED_UNHEALTHY,
+ new NodeTurnedUnhealthyTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE,
+ EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
+ AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
+ new IgnoreBlacklistingDisabledTransition())
+ .addTransition(
+ AMNodeState.FORCED_ACTIVE,
+ AMNodeState.FORCED_ACTIVE,
+ EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY,
+ AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED),
+ new GenericErrorTransition())
+
+ // Transitions from UNHEALTHY state.
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_CONTAINER_ALLOCATED,
+ new ContainerAllocatedWhileUnhealthyTransition())
+ .addTransition(
+ AMNodeState.UNHEALTHY,
+ AMNodeState.UNHEALTHY,
+ EnumSet
+ .of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
+ new IgnoreBlacklistingStateChangeTransition(false))
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
+ new IgnoreBlacklistingStateChangeTransition(true))
+ .addTransition(AMNodeState.UNHEALTHY,
+ EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE),
+ AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
.installTopology();