You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:35 UTC
[26/50] [abbrv] tez git commit: TEZ-1627. Remove OUTPUT_CONSUMABLE
and related Event in TaskAttemptImpl (zjffdu)
TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/31c1de6d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/31c1de6d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/31c1de6d
Branch: refs/heads/TEZ-8
Commit: 31c1de6daa7b6eff1a32a8d10999680084085fc1
Parents: 71ca74e
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Nov 7 10:20:37 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Nov 7 10:20:37 2014 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/dag/app/dag/Task.java | 19 -----
.../dag/app/dag/TaskAttemptStateInternal.java | 1 -
.../dag/app/dag/event/TaskAttemptEventType.java | 1 -
.../tez/dag/app/dag/event/TaskEventType.java | 1 -
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 72 -----------------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 84 --------------------
7 files changed, 1 insertion(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e58f0a..8fad485 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
TEZ-1733. TezMerger should sort FileChunks on size when merging
TEZ-1738. Tez tfile parser for log parsing
+ TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl
Release 0.5.3: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index b5fa45f..98a85cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -55,25 +55,6 @@ public interface Task {
* @return whether the attempt's output can be committed or not.
*/
boolean canCommit(TezTaskAttemptID taskAttemptID);
-
-
- /**
- * Do the running tasks need to stick around after they're done processing and
- * generating output. Required for tasks which have custom output handling
- * such as in-memory shuffle.
- *
- * @return whether the task needs to stick around.
- */
- boolean needsWaitAfterOutputConsumable();
-
- /**
- * Get the attempt id which has reported in as output ready. null if not
- * applicable.
- *
- * @return the attempt id which has reported in as output ready. null if not
- * applicable.
- */
- TezTaskAttemptID getOutputConsumableAttempt();
public Vertex getVertex();
http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
index a49c2a3..8d0d83e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
@@ -29,7 +29,6 @@ public enum TaskAttemptStateInternal {
NEW,
START_WAIT,
RUNNING,
- OUTPUT_CONSUMABLE,
KILL_IN_PROGRESS,
FAIL_IN_PROGRESS,
KILLED,
http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index e7db8d1..c8eec1b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -30,7 +30,6 @@ public enum TaskAttemptEventType {
TA_STARTED_REMOTELY,
TA_STATUS_UPDATE,
TA_DIAGNOSTICS_UPDATE,
- TA_OUTPUT_CONSUMABLE, // TODO History event to indicate this ?
TA_COMMIT_PENDING,
TA_DONE,
TA_FAILED,
http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index fe4dd5a..baec5f0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -34,7 +34,6 @@ public enum TaskEventType {
//Producer:TaskAttempt
T_ATTEMPT_LAUNCHED,
- T_ATTEMPT_OUTPUT_CONSUMABLE,
T_ATTEMPT_FAILED,
T_ATTEMPT_SUCCEEDED,
T_ATTEMPT_KILLED,
http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 45ca543..deaba42 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -236,10 +236,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.RUNNING,
- TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- new OutputConsumableTransition())
// Optional, may not come in for all tasks.
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
@@ -279,57 +275,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_OUTPUT_FAILED,
new OutputReportedFailedTransition())
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
- DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_OUTPUT_CONSUMABLE)
- // Stuck RPC. The client retries in a loop.
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
- new SucceededTransition())
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.FAIL_IN_PROGRESS,
- TaskAttemptEventType.TA_FAILED,
- new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.FAIL_IN_PROGRESS,
- TaskAttemptEventType.TA_TIMED_OUT,
- new TerminatedWhileRunningTransition(FAILED_HELPER))
- // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this.
- // Something needs to go out to the job.
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.KILL_IN_PROGRESS,
- TaskAttemptEventType.TA_KILL_REQUEST,
- new TerminatedWhileRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.KILL_IN_PROGRESS,
- TaskAttemptEventType.TA_NODE_FAILED,
- new TerminatedWhileRunningTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.FAIL_IN_PROGRESS,
- TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- new TerminatedWhileRunningTransition(FAILED_HELPER))
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.FAILED,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED,
- new ContainerCompletedBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- TaskAttemptStateInternal.KILLED,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
- new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
- .addTransition(
- TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
- EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
- TaskAttemptStateInternal.OUTPUT_CONSUMABLE),
- TaskAttemptEventType.TA_OUTPUT_FAILED,
- new OutputReportedFailedTransition())
-
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -344,7 +289,6 @@ public class TaskAttemptImpl implements TaskAttempt,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
@@ -367,7 +311,6 @@ public class TaskAttemptImpl implements TaskAttempt,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
@@ -387,7 +330,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_SCHEDULE,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
@@ -408,7 +350,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_SCHEDULE,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
@@ -762,7 +703,6 @@ public class TaskAttemptImpl implements TaskAttempt,
case START_WAIT:
return TaskAttemptState.STARTING;
case RUNNING:
- case OUTPUT_CONSUMABLE:
return TaskAttemptState.RUNNING;
case FAILED:
case FAIL_IN_PROGRESS:
@@ -1298,18 +1238,6 @@ public class TaskAttemptImpl implements TaskAttempt,
}
}
- protected static class OutputConsumableTransition implements
- SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-
- @Override
- public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
- //TaskAttemptEventOutputConsumable orEvent = (TaskAttemptEventOutputConsumable) event;
- //ta.shufflePort = orEvent.getOutputContext().getShufflePort();
- ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
- TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE));
- }
- }
-
protected static class SucceededTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 7bafb3b..4ded9be 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -187,10 +187,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Transitions from RUNNING state
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
- // This is an optional event.
- .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
- TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
- new AttemptProcessingCompleteTransition())
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
@@ -229,12 +225,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
EnumSet.of(
TaskEventType.T_TERMINATE,
TaskEventType.T_ATTEMPT_LAUNCHED,
- TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
- // TODO May required different handling if OUTPUT_CONSUMABLE is one of
- // the stages. i.e. Task would only SUCCEED after all output consumed.
.addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED, new TaskRetroactiveFailureTransition())
@@ -298,9 +291,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
*/
- private TezTaskAttemptID outputConsumableAttempt;
- private boolean outputConsumableAttemptSuccessSent = false;
-
//should be set to one which comes first
//saying COMMIT_PENDING
private TezTaskAttemptID commitAttempt;
@@ -750,30 +740,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- // TODO remove hacky name lookup
- @Override
- public boolean needsWaitAfterOutputConsumable() {
- Vertex vertex = getVertex();
- ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
- if (processorDescriptor != null &&
- processorDescriptor.getClassName().contains("InitialTaskWithInMemSort")) {
- return true;
- } else {
- return false;
- }
- }
-
-
- @Override
- public TezTaskAttemptID getOutputConsumableAttempt() {
- readLock.lock();
- try {
- return this.outputConsumableAttempt;
- } finally {
- readLock.unlock();
- }
- }
-
TaskAttemptImpl createAttempt(int attemptNumber) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
@@ -877,18 +843,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
TaskAttemptStateInternal attemptState) {
- // raise the completion event only if the container is assigned
- // to nextAttemptNumber
- if (needsWaitAfterOutputConsumable()) {
- // An event may have been sent out during the OUTPUT_READY state itself.
- // Make sure the same event is not being sent out again.
- if (attemptId == outputConsumableAttempt
- && attemptState == TaskAttemptStateInternal.SUCCEEDED) {
- if (outputConsumableAttemptSuccessSent) {
- return;
- }
- }
- }
eventHandler.handle(new VertexEventTaskAttemptCompleted(attemptId, attemptState));
}
@@ -1037,38 +991,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- private static class AttemptProcessingCompleteTransition implements
- SingleArcTransition<TaskImpl, TaskEvent> {
-
- @Override
- public void transition(TaskImpl task, TaskEvent event) {
- TaskEventTAUpdate taEvent = (TaskEventTAUpdate) event;
- TezTaskAttemptID attemptId = taEvent.getTaskAttemptID();
-
- if (task.outputConsumableAttempt == null) {
- task.sendTaskAttemptCompletionEvent(attemptId,
- TaskAttemptStateInternal.SUCCEEDED);
- task.outputConsumableAttempt = attemptId;
- task.outputConsumableAttemptSuccessSent = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("TezTaskAttemptID: " + attemptId
- + " set as the OUTPUT_READY attempt");
- }
- } else {
- // Nothing to do. This task will eventually be told to die, or will be
- // killed.
- if (LOG.isDebugEnabled()) {
- LOG.debug("TezTaskAttemptID: "
- + attemptId + " reporting OUTPUT_READY."
- + " Will be asked to die since another attempt "
- + task.outputConsumableAttempt + " already has output ready");
- }
- task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptId,
- "Alternate attemptId already serving output"));
- }
-
- }
- }
private static class AttemptSucceededTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@@ -1313,12 +1235,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
task.commitAttempt = null;
}
- if (castEvent.getTaskAttemptID().equals(task.outputConsumableAttempt)) {
- task.outputConsumableAttempt = null;
- task.handleTaskAttemptCompletion(castEvent.getTaskAttemptID(),
- TaskAttemptStateInternal.FAILED);
- }
-
// The attempt would have informed the scheduler about it's failure
task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);