You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/18 23:40:50 UTC

[03/23] tez git commit: TEZ-2024. TaskFinishedEvent may not be logged in recovery (zjffdu)

TEZ-2024. TaskFinishedEvent may not be logged in recovery (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b74bab47
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b74bab47
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b74bab47

Branch: refs/heads/TEZ-2003
Commit: b74bab471649021cf9e8d4f9087bfce0fa9bf757
Parents: c266289
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Feb 13 11:06:38 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Feb 13 11:06:38 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java    |  1 +
 .../dag/app/dag/impl/TestTaskAttemptRecovery.java    | 15 +++++++++++++++
 .../tez/dag/app/dag/impl/TestTaskRecovery.java       |  7 ++++++-
 4 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b74bab47/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 91183eb..1e8c116 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -61,6 +61,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2024. TaskFinishedEvent may not be logged in recovery.
   TEZ-2031. Tez UI: horizontal scrollbars do not appear in tables, causing them to look truncated. 
   TEZ-2073. SimpleHistoryLoggingService cannot be read by log aggregation (umask)
   TEZ-2078. Tez UI: Task logs url use in-progress url causing various errors.

http://git-wip-us.apache.org/repos/asf/tez/blob/b74bab47/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index aba20cf..3f9e2cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -512,6 +512,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
         }
         recoveredState = TaskState.SCHEDULED;
+        historyTaskStartGenerated = true;
         taskAttemptStatus.clear();
         return recoveredState;
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/b74bab47/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 9d0e121..b8b09d0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -56,6 +57,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -112,6 +114,19 @@ public class TestTaskAttemptRecovery {
       }
       assertEquals(expectedTimes, actualTimes);
     }
+
+    void verifyTaskFinishedEvent(TezTaskID taskId, TaskState finalState, int expectedTimes) {
+      int actualTimes = 0;
+      for (DAGHistoryEvent event : events) {
+        if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_FINISHED) {
+          TaskFinishedEvent tfEvent = (TaskFinishedEvent)event.getHistoryEvent();
+          if (tfEvent.getTaskID().equals(taskId) && tfEvent.getState().equals(finalState)) {
+            actualTimes ++;
+          }
+        }
+      }
+      assertEquals(expectedTimes, actualTimes);
+    }
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/tez/blob/b74bab47/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index f6e0162..c2185d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -58,6 +58,7 @@ import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.TestTaskAttemptRecovery.MockHistoryEventHandler;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
@@ -82,6 +83,7 @@ public class TestTaskRecovery {
 
   private Configuration conf = new Configuration();
   private AppContext mockAppContext;
+  private MockHistoryEventHandler  mockHistoryEventHandler;
   private ApplicationId appId = ApplicationId.newInstance(
       System.currentTimeMillis(), 1);
   private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
@@ -181,7 +183,8 @@ public class TestTaskRecovery {
     mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
     when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class)))
         .thenReturn(vertex);
-
+    mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
+    when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
     task =
         new TaskImpl(vertexId, 0, dispatcher.getEventHandler(),
             new Configuration(), mock(TaskAttemptListener.class),
@@ -313,6 +316,7 @@ public class TestTaskRecovery {
     assertEquals(0, task.failedAttempts);
     assertEquals(0, task.getUncompletedAttemptsCount());
     assertEquals(taId, task.successfulAttempt);
+    mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1);
   }
 
   /**
@@ -420,6 +424,7 @@ public class TestTaskRecovery {
     assertEquals(0, task.failedAttempts);
     assertEquals(0, task.getUncompletedAttemptsCount());
     assertEquals(taId, task.successfulAttempt);
+    mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1);
   }
 
   /**