You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/12/17 15:18:13 UTC
[tez] branch branch-0.9 updated: TEZ-4108. NullPointerException
during speculative execution race condition
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 4a03321 TEZ-4108. NullPointerException during speculative execution race condition
4a03321 is described below
commit 4a0332160567b78b334054986fb7d9e7320adb23
Author: Jonathan Eagles <je...@apache.org>
AuthorDate: Tue Dec 17 09:06:05 2019 -0600
TEZ-4108. NullPointerException during speculative execution race condition
(cherry picked from commit 689d7388592196c320319f026961b3fef0167dc5)
---
.../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 4 ++-
.../apache/tez/dag/app/dag/impl/TestTaskImpl.java | 39 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2d0688f..39e2b4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1029,7 +1029,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// find the oldest running attempt
if (!ta.isFinished()) {
earliestUnfinishedAttempt = ta;
- task.nodesWithRunningAttempts.add(ta.getNodeId());
+ if (ta.getNodeId() != null) {
+ task.nodesWithRunningAttempts.add(ta.getNodeId());
+ }
} else {
if (TaskAttemptState.SUCCEEDED.equals(ta.getState())) {
LOG.info("Ignore speculation scheduling for task {} since it has succeeded with attempt {}.",
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index a3de936..1af6092 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -887,6 +887,45 @@ public class TestTaskImpl {
assertEquals(2, mockTask.getAttemptList().size());
}
+ @Test
+ public void testKilledBeforeSpeculatedSucceeded() {
+ conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+ Vertex vertex = mock(Vertex.class);
+ doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
+ mockTask = new MockTaskImpl(vertexId, partition,
+ eventHandler, conf, taskCommunicatorManagerInterface, clock,
+ taskHeartbeatHandler, appContext, leafVertex,
+ taskResource, containerContext, vertex);
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+ mockTask.handle(createTaskTAKilledEvent(firstAttempt.getID()));
+ assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
+
+ // We need to manually override the current node id
+ // to induce NPE in the state machine transition
+ // simulating killed before speculated scenario
+ NodeId nodeId = mockNodeId;
+ mockNodeId = null;
+
+ // Add a speculative task attempt
+ mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+ mockNodeId = nodeId;
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+ assertEquals(3, mockTask.getAttemptList().size());
+
+ // Now succeed the speculative attempt
+ updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
+ mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
+ assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+ assertEquals(3, mockTask.getAttemptList().size());
+ }
+
@Test(timeout = 20000)
public void testKilledAttemptUpdatesDAGScheduler() {
TezTaskID taskId = getNewTaskID();