You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/12/17 15:06:25 UTC

[tez] branch master updated: TEZ-4108. NullPointerException during speculative execution race condition

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 689d738  TEZ-4108. NullPointerException during speculative execution race condition
689d738 is described below

commit 689d7388592196c320319f026961b3fef0167dc5
Author: Jonathan Eagles <je...@apache.org>
AuthorDate: Tue Dec 17 09:06:05 2019 -0600

    TEZ-4108. NullPointerException during speculative execution race condition
---
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  |  4 ++-
 .../apache/tez/dag/app/dag/impl/TestTaskImpl.java  | 39 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2d0688f..39e2b4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1029,7 +1029,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // find the oldest running attempt
         if (!ta.isFinished()) {
           earliestUnfinishedAttempt = ta;
-          task.nodesWithRunningAttempts.add(ta.getNodeId());
+          if (ta.getNodeId() != null) {
+            task.nodesWithRunningAttempts.add(ta.getNodeId());
+          }
         } else {
           if (TaskAttemptState.SUCCEEDED.equals(ta.getState())) {
             LOG.info("Ignore speculation scheduling for task {} since it has succeeded with attempt {}.",
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index a3de936..1af6092 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -887,6 +887,45 @@ public class TestTaskImpl {
     assertEquals(2, mockTask.getAttemptList().size());
   }
 
+  @Test
+  public void testKilledBeforeSpeculatedSucceeded() {
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
+    mockTask = new MockTaskImpl(vertexId, partition,
+        eventHandler, conf, taskCommunicatorManagerInterface, clock,
+        taskHeartbeatHandler, appContext, leafVertex,
+        taskResource, containerContext, vertex);
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+    mockTask.handle(createTaskTAKilledEvent(firstAttempt.getID()));
+    assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
+
+    // We need to manually override the current node id
+    // to induce NPE in the state machine transition
+    // simulating killed before speculated scenario
+    NodeId nodeId = mockNodeId;
+    mockNodeId = null;
+
+    // Add a speculative task attempt
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    mockNodeId = nodeId;
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+    assertEquals(3, mockTask.getAttemptList().size());
+
+    // Now succeed the speculative attempt
+    updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
+    mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+    assertEquals(3, mockTask.getAttemptList().size());
+  }
+
   @Test(timeout = 20000)
   public void testKilledAttemptUpdatesDAGScheduler() {
     TezTaskID taskId = getNewTaskID();