You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/04/26 20:24:08 UTC

[tez] branch branch-0.9 updated: TEZ-4062. Speculative attempt scheduling should be aborted when Task has completed

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 1208fe1  TEZ-4062. Speculative attempt scheduling should be aborted when Task has completed
1208fe1 is described below

commit 1208fe1ae8f75a6bd1861845f240a40d9379ee1d
Author: Ying Han <hy...@outlook.com>
AuthorDate: Fri Apr 26 15:20:29 2019 -0500

    TEZ-4062. Speculative attempt scheduling should be aborted when Task has completed
    
    Signed-off-by: Jonathan Eagles <je...@apache.org>
    (cherry picked from commit 0f71b0b0622f6a9af71bf9d8b3bf22c1a040cccb)
---
 .../java/org/apache/tez/dag/app/dag/impl/TaskImpl.java  | 11 +++++++++++
 .../org/apache/tez/dag/app/dag/impl/TestTaskImpl.java   | 17 +++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9289d8f..e563fe9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1030,8 +1030,19 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         if (!ta.isFinished()) {
           earliestUnfinishedAttempt = ta;
           task.nodesWithRunningAttempts.add(ta.getNodeId());
+        } else {
+          if (TaskAttemptState.SUCCEEDED.equals(ta.getState())) {
+            LOG.info("Ignore speculation scheduling for task {} since it has succeeded with attempt {}.",
+                task.getTaskId(), ta.getID());
+            return;
+          }
         }
       }
+      if (earliestUnfinishedAttempt == null) {
+        // no running (or SUCCEEDED) task attempt at this moment, no need to schedule speculative attempt either
+        LOG.info("Ignore speculation scheduling since there is no running attempt on task {}.", task.getTaskId());
+        return;
+      }
       task.addAndScheduleAttempt(earliestUnfinishedAttempt.getID());
     }
   }
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 81cd675..2d4adcc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -982,6 +982,23 @@ public class TestTaskImpl {
     Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
   }
 
+  @Test(timeout = 20000)
+  public void testIgnoreSpeculationOnSuccessfulOriginalAttempt() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    // Mock success of the first task attempt
+    updateAttemptState(firstAttempt, TaskAttemptState.SUCCEEDED);
+    firstAttempt.handle(new TaskAttemptEvent(firstAttempt.getID(), TaskAttemptEventType.TA_DONE));
+
+    // Verify the speculation scheduling is ignored and no speculative attempt was added to the task
+    mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    assertEquals(1, mockTask.getAttemptList().size());
+  }
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {