You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/02/13 03:02:23 UTC

tez git commit: TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master bc6a490df -> c266289cd


TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c266289c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c266289c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c266289c

Branch: refs/heads/master
Commit: c266289cde6693e8a586e3c4b2bdffbd6b98b9ca
Parents: bc6a490
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Feb 13 10:01:33 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Feb 13 10:01:33 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  1 +
 .../app/dag/impl/TestTaskAttemptRecovery.java   | 66 +++++++++++++++++++-
 3 files changed, 66 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c266289c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b33c454..91183eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -195,6 +195,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED.
   TEZ-2071. TestAMRecovery should set test names for test DAGs.
   TEZ-1928. Tez local mode hang in Pig tez local mode.
   TEZ-1893. Verify invalid -1 parallelism in DAG.verify().

http://git-wip-us.apache.org/repos/asf/tez/blob/c266289c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ccac620..4a4506e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1392,6 +1392,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskEventType.T_ATTEMPT_KILLED));
           taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt,
               getExternalState(TaskAttemptStateInternal.KILLED)));
+          taskAttempt.logJobHistoryAttemptUnsuccesfulCompletion(TaskAttemptState.KILLED);
           endState = TaskAttemptStateInternal.KILLED;
           break;
         case SUCCEEDED:

http://git-wip-us.apache.org/repos/asf/tez/blob/c266289c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index e5fcd72..9d0e121 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -20,10 +20,15 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,16 +44,22 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -61,11 +72,59 @@ public class TestTaskAttemptRecovery {
   private long startTime = System.currentTimeMillis();
   private long finishTime = startTime + 5000;
 
-  private TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
+  private TezTaskAttemptID taId;
   private String vertexName = "v1";
 
+  private AppContext mockAppContext;
+  private MockHistoryEventHandler mockHistoryEventHandler;
+  private Task mockTask;
+  private Vertex mockVertex;
+
+  public static class MockHistoryEventHandler extends HistoryEventHandler {
+
+    private List<DAGHistoryEvent> events;
+
+    public MockHistoryEventHandler(AppContext context) {
+      super(context);
+      events = new ArrayList<DAGHistoryEvent>();
+    }
+
+    @Override
+    public void handle(DAGHistoryEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
+      events.add(event);
+    }
+
+    void verfiyTaskAttemptFinishedEvent(TezTaskAttemptID taId, TaskAttemptState finalState, int expectedTimes) {
+      int actualTimes = 0;
+      for (DAGHistoryEvent event : events) {
+        if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
+          TaskAttemptFinishedEvent tfEvent = (TaskAttemptFinishedEvent)event.getHistoryEvent();
+          if (tfEvent.getTaskAttemptID().equals(taId) &&
+              tfEvent.getState().equals(finalState)) {
+            actualTimes ++;
+          }
+        }
+      }
+      assertEquals(expectedTimes, actualTimes);
+    }
+  }
+
   @Before
   public void setUp() {
+    mockTask = mock(Task.class);
+    mockVertex = mock(Vertex.class);
+    when(mockTask.getVertex()).thenReturn(mockVertex);
+    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class))
+      .getTask(any(TezTaskID.class)))
+      .thenReturn(mockTask);
+    mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
+    when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
     mockEventHandler = mock(EventHandler.class);
     TezTaskID taskId =
         TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
@@ -73,8 +132,9 @@ public class TestTaskAttemptRecovery {
         new TaskAttemptImpl(taskId, 0, mockEventHandler,
             mock(TaskAttemptListener.class), new Configuration(),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
-            mock(AppContext.class), false, Resource.newInstance(1, 1),
+            mockAppContext, false, Resource.newInstance(1, 1),
             mock(ContainerContext.class), false);
+    taId = ta.getID();
   }
 
   private void restoreFromTAStartEvent() {
@@ -157,6 +217,8 @@ public class TestTaskAttemptRecovery {
     verifyEvents(events, TaskEventTAUpdate.class, 1);
     // one for task launch, one for task killed
     verifyEvents(events, DAGEventCounterUpdate.class, 2);
+
+    mockHistoryEventHandler.verfiyTaskAttemptFinishedEvent(taId, TaskAttemptState.KILLED, 1);
   }
 
   /**