You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/05/10 10:00:02 UTC

[tez] 02/02: TEZ-4068. Prevent new speculative attempt after task has issued canCommit to an attempt

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git

commit 4125639619d5e3252fde62579d431ce5071414ea
Author: Ying Han <hy...@outlook.com>
AuthorDate: Fri May 10 04:59:02 2019 -0500

    TEZ-4068. Prevent new speculative attempt after task has issued canCommit to an attempt
    
    Signed-off-by: Jonathan Eagles <je...@apache.org>
---
 .../java/org/apache/tez/dag/app/dag/impl/TaskImpl.java  |  5 +++++
 .../org/apache/tez/dag/app/dag/impl/TestTaskImpl.java   | 17 +++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e563fe9..2d0688f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1043,6 +1043,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         LOG.info("Ignore speculation scheduling since there is no running attempt on task {}.", task.getTaskId());
         return;
       }
+      if (task.commitAttempt != null) {
+        LOG.info("Ignore speculation scheduling for task {} since commit has started with commitAttempt {}.",
+            task.getTaskId(), task.commitAttempt);
+        return;
+      }
       task.addAndScheduleAttempt(earliestUnfinishedAttempt.getID());
     }
   }
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 2d4adcc..a3de936 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -999,6 +999,23 @@ public class TestTaskImpl {
     assertEquals(1, mockTask.getAttemptList().size());
   }
 
+  @Test(timeout = 20000)
+  public void testIgnoreSpeculationAfterOriginalAttemptCommit() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+    // Mock commit of the first task attempt
+    mockTask.canCommit(firstAttempt.getID());
+
+    // Verify the speculation scheduling is ignored and no speculative attempt was added to the task
+    mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    assertEquals(1, mockTask.getAttemptList().size());
+  }
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {