You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/03 02:37:48 UTC

tez git commit: TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 631c3e9c6 -> 44a6bb686


TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/44a6bb68
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/44a6bb68
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/44a6bb68

Branch: refs/heads/master
Commit: 44a6bb686b08326b45c232fb7466b43c574fe7c1
Parents: 631c3e9
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Jun 3 08:37:39 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Jun 3 08:37:39 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 17 +-----
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 42 ++++----------
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 59 +++++++++++++++++++-
 .../app/dag/impl/TestTaskAttemptRecovery.java   | 14 ++---
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  8 +++
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 10 ++++
 7 files changed, 95 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4c4967..0f1eabd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -418,6 +418,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery
   TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes.
   TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
   TEZ-2369. Add a few unit tests for RootInputInitializerManager.

http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 036022e..2650be2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -752,10 +752,6 @@ public class TaskAttemptImpl implements TaskAttempt,
         }
         case TASK_ATTEMPT_FINISHED:
         {
-          if (!recoveryStartEventSeen) {
-            throw new RuntimeException("Finished Event seen but"
-                + " no Started Event was encountered earlier");
-          }
           TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
           this.finishTime = tEvent.getFinishTime();
           this.reportedStatus.counters = tEvent.getCounters();
@@ -1117,17 +1113,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
           helper.getTaskAttemptState()));
-      if (ta.getLaunchTime() != 0) {
-        // TODO For cases like this, recovery goes for a toss, since the the
-        // attempt will not exist in the history file.
-        ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
-            .getTaskAttemptState());
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Not generating HistoryFinish event since start event not "
-              + "generated for taskAttempt: " + ta.getID());
-        }
-      }
+      ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
+          .getTaskAttemptState());
       // Send out events to the Task - indicating TaskAttemptTermination(F/K)
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
           .getTaskEventType()));

http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index b2eb81e..1251ac4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -144,8 +144,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   // track the status of TaskAttempt (true mean completed, false mean uncompleted)
   private final Map<Integer, Boolean> taskAttemptStatus = new HashMap<Integer,Boolean>();
 
-  private boolean historyTaskStartGenerated = false;
-
   private static final SingleArcTransition<TaskImpl , TaskEvent>
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent>
@@ -570,7 +568,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
           }
           recoveredState = TaskState.SCHEDULED;
-          historyTaskStartGenerated = true;
           taskAttemptStatus.clear();
           return recoveredState;
         }
@@ -602,8 +599,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             LOG.debug("Adding restored attempt into known attempts map"
                 + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
           }
-          this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
-              recoveredAttempt);
+          Preconditions.checkArgument(this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
+              recoveredAttempt) == null, taskAttemptStartedEvent.getTaskAttemptID() + " already existed.");
           this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(), false);
           this.recoveredState = TaskState.RUNNING;
           return recoveredState;
@@ -850,11 +847,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(maxFailedAttempts);
         newAttempts.putAll(attempts);
         attempts = newAttempts;
-        attempts.put(attempt.getID(), attempt);
+        Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
+            attempt.getID() + " already existed");
         break;
 
       default:
-        attempts.put(attempt.getID(), attempt);
+        Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
+            attempt.getID() + " already existed");
         break;
     }
 
@@ -1050,7 +1049,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.addAndScheduleAttempt();
       task.scheduledTime = task.clock.getTime();
       task.logJobHistoryTaskStartedEvent();
-      task.historyTaskStartGenerated = true;
     }
   }
 
@@ -1092,9 +1090,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.eventHandler.handle(new VertexEventTaskCompleted(
           task.taskId, TaskState.SUCCEEDED));
       LOG.info("Task succeeded with attempt " + task.successfulAttempt);
-      if (task.historyTaskStartGenerated) {
-        task.logJobHistoryTaskFinishedEvent();
-      }
+      task.logJobHistoryTaskFinishedEvent();
       TaskAttempt successfulAttempt = task.attempts.get(successTaId);
 
       // issue kill to all other attempts
@@ -1297,13 +1293,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);
       // check whether all attempts are finished
       if (task.getFinishedAttemptsCount() == task.attempts.size()) {
-        if (task.historyTaskStartGenerated) {
-          task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
-        } else {
-          LOG.debug("Not generating HistoryFinish event since start event not" +
-          		" generated for task: " + task.getTaskId());
-        }
-
+        task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
         task.eventHandler.handle(
             new VertexEventTaskCompleted(
                 task.taskId, getExternalState(TaskStateInternal.KILLED)));
@@ -1349,13 +1339,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         task.handleTaskAttemptCompletion(
             ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TaskAttemptStateInternal.FAILED);
-
-        if (task.historyTaskStartGenerated) {
-          task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
-        } else {
-          LOG.debug("Not generating HistoryFinish event since start event not" +
-          		" generated for task: " + task.getTaskId());
-        }
+        task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
         task.eventHandler.handle(
             new VertexEventTaskCompleted(task.taskId, TaskState.FAILED));
         return task.finished(TaskStateInternal.FAILED);
@@ -1464,13 +1448,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     public void transition(TaskImpl task, TaskEvent event) {
       TaskEventTermination terminateEvent = (TaskEventTermination)event;
       task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
-      if (task.historyTaskStartGenerated) {
-        task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
-      } else {
-        LOG.debug("Not generating HistoryFinish event since start event not" +
-        		" generated for task: " + task.getTaskId());
-      }
-
+      task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
       task.eventHandler.handle(
           new VertexEventTaskCompleted(task.taskId, TaskState.KILLED));
       // TODO Metrics

http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 86251cc..15d5609 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1212,6 +1212,59 @@ public class TestTaskAttempt {
         arg.capture());
   }
 
+  @SuppressWarnings("deprecation")
+  @Test(timeout = 5000)
+  public void testKilledInNew() {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), true);
+    Assert.assertEquals(TaskAttemptStateInternal.NEW, taImpl.getInternalState());
+    taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it",
+        TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
+    Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+
+    Assert.assertEquals(0, taImpl.taskAttemptStartedEventLogged);
+    Assert.assertEquals(1, taImpl.taskAttemptFinishedEventLogged);
+  }
+
   private Event verifyEventType(List<Event> events,
       Class<? extends Event> eventClass, int expectedOccurences) {
     int count = 0;
@@ -1244,6 +1297,8 @@ public class TestTaskAttempt {
 
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
     
+    public int taskAttemptStartedEventLogged = 0;
+    public int taskAttemptFinishedEventLogged = 0;
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
         EventHandler eventHandler, TaskAttemptListener tal,
         Configuration conf, Clock clock,
@@ -1273,17 +1328,19 @@ public class TestTaskAttempt {
 
     @Override
     protected void logJobHistoryAttemptStarted() {
+      taskAttemptStartedEventLogged++;
     }
 
     @Override
     protected void logJobHistoryAttemptFinishedEvent(
         TaskAttemptStateInternal state) {
-
+      taskAttemptFinishedEventLogged++;
     }
 
     @Override
     protected void logJobHistoryAttemptUnsuccesfulCompletion(
         TaskAttemptState state) {
+      taskAttemptFinishedEventLogged++;
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index d6d874d..0665b1e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -297,16 +297,14 @@ public class TestTaskAttemptRecovery {
   }
 
   /**
-   * restoreFromTAFinishedEvent ( no TAStartEvent before TAFinishedEvent )
+   * restoreFromTAFinishedEvent ( killed before started)
    */
   @Test(timeout = 5000)
   public void testRecover_FINISH_BUT_NO_START() {
-    try {
-      restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
-      fail("Should fail due to no TaskAttemptStartEvent before TaskAttemptFinishedEvent");
-    } catch (Throwable e) {
-      assertEquals("Finished Event seen but"
-          + " no Started Event was encountered earlier", e.getMessage());
-    }
+    TaskAttemptState recoveredState =
+        ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+            startTime, finishTime, TaskAttemptState.KILLED,
+            TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters()));
+    assertEquals(TaskAttemptState.KILLED, recoveredState);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1ecabef..87dd2fa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -634,6 +634,8 @@ public class TestTaskImpl {
     mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
     assertEquals(1, mockTask.getDiagnostics().size());
     assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
+    assertEquals(0, mockTask.taskStartedEventLogged);
+    assertEquals(1, mockTask.taskFinishedEventLogged);
   }
   
   @Test(timeout = 5000)
@@ -674,6 +676,9 @@ public class TestTaskImpl {
   @SuppressWarnings("rawtypes")
   private class MockTaskImpl extends TaskImpl {
 
+    public int taskStartedEventLogged = 0;
+    public int taskFinishedEventLogged = 0;
+
     private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
     private Vertex vertex;
 
@@ -719,12 +724,15 @@ public class TestTaskImpl {
     }
 
     protected void logJobHistoryTaskStartedEvent() {
+      taskStartedEventLogged++;
     }
 
     protected void logJobHistoryTaskFinishedEvent() {
+      taskFinishedEventLogged++;
     }
 
     protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
+      taskFinishedEventLogged++;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 2a49826..a16fad2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -250,6 +250,16 @@ public class TestTaskRecovery {
   }
 
   /**
+   * -> restoreFromTaskFinishEvent ( no TaskStartEvent )
+   */
+  @Test(timeout = 5000)
+  public void testRecoveryNewToKilled_NoStartEvent() {
+    task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName,
+        taskStartTime, taskFinishTime, null, TaskState.KILLED, "",
+        new TezCounters(), 0));
+  }
+
+  /**
    * restoreFromTaskStartedEvent -> RecoverTransition
    */
   @Test(timeout = 5000)