You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/01/21 20:04:39 UTC
[1/2] tez git commit: TEZ-3052. Task internal error due to Invalid
event: T_ATTEMPT_FAILED at FAILED (jlowe)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 09d8c82d6 -> 238c3ad8f
refs/heads/master 73e993cba -> ca447ba5c
TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ca447ba5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ca447ba5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ca447ba5
Branch: refs/heads/master
Commit: ca447ba5c940d0ec8520166646695c49f2cd9dc3
Parents: 73e993c
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 21 18:57:52 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 21 18:57:52 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 73 ++++++++++++++++++++
3 files changed, 78 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ca447ba5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5d2c446..bec7dd4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
TEZ-2594. Fix LICENSE for missing entries for full and minimal tarballs.
TEZ-3053. Containers timeout if they do not receive a task within the container timeout interval.
TEZ-2898. tez tools : swimlanes.py is broken.
@@ -318,6 +319,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
TEZ-3037. History URL should be set regardless of which history logging service is enabled.
TEZ-3032. DAG start time getting logged using system time instead of recorded time in startTime field.
http://git-wip-us.apache.org/repos/asf/tez/blob/ca447ba5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index c00d674..9ec7ce8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -256,7 +256,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_TERMINATE,
TaskEventType.T_SCHEDULE,
TaskEventType.T_ADD_SPEC_ATTEMPT,
- TaskEventType.T_ATTEMPT_KILLED))
+ TaskEventType.T_ATTEMPT_FAILED,
+ TaskEventType.T_ATTEMPT_KILLED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED))
// Transitions from KILLED state
// Ignorable event: T_ATTEMPT_KILLED
http://git-wip-us.apache.org/repos/asf/tez/blob/ca447ba5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 0414c99..1274378 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
@@ -661,6 +662,78 @@ public class TestTaskImpl {
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
}
+ @Test(timeout = 20000)
+ public void testFailedThenSpeculativeFailed() {
+ conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+ mockTask = new MockTaskImpl(vertexId, partition,
+ eventHandler, conf, taskCommunicatorManagerInterface, clock,
+ taskHeartbeatHandler, appContext, leafVertex,
+ taskResource, containerContext, mock(Vertex.class));
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+ // Add a speculative task attempt
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Fail the first attempt
+ updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Now fail the speculative attempt
+ updateAttemptState(specAttempt, TaskAttemptState.FAILED);
+ mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+ }
+
+ @Test(timeout = 20000)
+ public void testFailedThenSpeculativeSucceeded() {
+ conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+ mockTask = new MockTaskImpl(vertexId, partition,
+ eventHandler, conf, taskCommunicatorManagerInterface, clock,
+ taskHeartbeatHandler, appContext, leafVertex,
+ taskResource, containerContext, mock(Vertex.class));
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+ // Add a speculative task attempt
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Fail the first attempt
+ updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Now succeed the speculative attempt
+ updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
+ mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+ }
+
// TODO Add test to validate the correct commit attempt.
[2/2] tez git commit: TEZ-3052. Task internal error due to Invalid
event: T_ATTEMPT_FAILED at FAILED (jlowe)
Posted by jl...@apache.org.
TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/238c3ad8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/238c3ad8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/238c3ad8
Branch: refs/heads/branch-0.7
Commit: 238c3ad8fb007447830ce40fa2b4c498fb80a7f5
Parents: 09d8c82
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jan 21 19:04:17 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jan 21 19:04:17 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 73 ++++++++++++++++++++
3 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/238c3ad8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4457dc4..bd3e64a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES
+ TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
TEZ-3046. Compilation issue in tez-runtime-internals of branch-0.7
TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
TEZ-3037. History URL should be set regardless of which history logging service is enabled.
http://git-wip-us.apache.org/repos/asf/tez/blob/238c3ad8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e5e0a37..f78932b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -261,7 +261,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_TERMINATE,
TaskEventType.T_SCHEDULE,
TaskEventType.T_ADD_SPEC_ATTEMPT,
- TaskEventType.T_ATTEMPT_KILLED))
+ TaskEventType.T_ATTEMPT_FAILED,
+ TaskEventType.T_ATTEMPT_KILLED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED))
// Transitions from KILLED state
// Ignorable event: T_ATTEMPT_KILLED
http://git-wip-us.apache.org/repos/asf/tez/blob/238c3ad8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1ee6c4e..8852d93 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
@@ -677,6 +678,78 @@ public class TestTaskImpl {
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
}
+ @Test(timeout = 20000)
+ public void testFailedThenSpeculativeFailed() {
+ conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+ mockTask = new MockTaskImpl(vertexId, partition,
+ eventHandler, conf, taskAttemptListener, clock,
+ taskHeartbeatHandler, appContext, leafVertex,
+ taskResource, containerContext, mock(Vertex.class));
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+ // Add a speculative task attempt
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Fail the first attempt
+ updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Now fail the speculative attempt
+ updateAttemptState(specAttempt, TaskAttemptState.FAILED);
+ mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+ }
+
+ @Test(timeout = 20000)
+ public void testFailedThenSpeculativeSucceeded() {
+ conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+ mockTask = new MockTaskImpl(vertexId, partition,
+ eventHandler, conf, taskAttemptListener, clock,
+ taskHeartbeatHandler, appContext, leafVertex,
+ taskResource, containerContext, mock(Vertex.class));
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+ // Add a speculative task attempt
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Fail the first attempt
+ updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Now succeed the speculative attempt
+ updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
+ mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(2, mockTask.getAttemptList().size());
+ }
+
// TODO Add test to validate the correct commit attempt.