You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/02/13 03:02:23 UTC
tez git commit: TEZ-2037. Should log TaskAttemptFinishedEvent if
taskattempt is recovered to KILLED (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master bc6a490df -> c266289cd
TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c266289c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c266289c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c266289c
Branch: refs/heads/master
Commit: c266289cde6693e8a586e3c4b2bdffbd6b98b9ca
Parents: bc6a490
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Feb 13 10:01:33 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Feb 13 10:01:33 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 1 +
.../app/dag/impl/TestTaskAttemptRecovery.java | 66 +++++++++++++++++++-
3 files changed, 66 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c266289c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b33c454..91183eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -195,6 +195,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2037. Should log TaskAttemptFinishedEvent if taskattempt is recovered to KILLED.
TEZ-2071. TestAMRecovery should set test names for test DAGs.
TEZ-1928. Tez local mode hang in Pig tez local mode.
TEZ-1893. Verify invalid -1 parallelism in DAG.verify().
http://git-wip-us.apache.org/repos/asf/tez/blob/c266289c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ccac620..4a4506e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1392,6 +1392,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskEventType.T_ATTEMPT_KILLED));
taskAttempt.sendEvent(createDAGCounterUpdateEventTAFinished(taskAttempt,
getExternalState(TaskAttemptStateInternal.KILLED)));
+ taskAttempt.logJobHistoryAttemptUnsuccesfulCompletion(TaskAttemptState.KILLED);
endState = TaskAttemptStateInternal.KILLED;
break;
case SUCCEEDED:
http://git-wip-us.apache.org/repos/asf/tez/blob/c266289c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index e5fcd72..9d0e121 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -20,10 +20,15 @@ package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -39,16 +44,22 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -61,11 +72,59 @@ public class TestTaskAttemptRecovery {
private long startTime = System.currentTimeMillis();
private long finishTime = startTime + 5000;
- private TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
+ private TezTaskAttemptID taId;
private String vertexName = "v1";
+ private AppContext mockAppContext;
+ private MockHistoryEventHandler mockHistoryEventHandler;
+ private Task mockTask;
+ private Vertex mockVertex;
+
+ public static class MockHistoryEventHandler extends HistoryEventHandler {
+
+ private List<DAGHistoryEvent> events;
+
+ public MockHistoryEventHandler(AppContext context) {
+ super(context);
+ events = new ArrayList<DAGHistoryEvent>();
+ }
+
+ @Override
+ public void handle(DAGHistoryEvent event) {
+ events.add(event);
+ }
+
+ @Override
+ public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
+ events.add(event);
+ }
+
+ void verfiyTaskAttemptFinishedEvent(TezTaskAttemptID taId, TaskAttemptState finalState, int expectedTimes) {
+ int actualTimes = 0;
+ for (DAGHistoryEvent event : events) {
+ if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
+ TaskAttemptFinishedEvent tfEvent = (TaskAttemptFinishedEvent)event.getHistoryEvent();
+ if (tfEvent.getTaskAttemptID().equals(taId) &&
+ tfEvent.getState().equals(finalState)) {
+ actualTimes ++;
+ }
+ }
+ }
+ assertEquals(expectedTimes, actualTimes);
+ }
+ }
+
@Before
public void setUp() {
+ mockTask = mock(Task.class);
+ mockVertex = mock(Vertex.class);
+ when(mockTask.getVertex()).thenReturn(mockVertex);
+ mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class))
+ .getTask(any(TezTaskID.class)))
+ .thenReturn(mockTask);
+ mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
+ when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
mockEventHandler = mock(EventHandler.class);
TezTaskID taskId =
TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
@@ -73,8 +132,9 @@ public class TestTaskAttemptRecovery {
new TaskAttemptImpl(taskId, 0, mockEventHandler,
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
- mock(AppContext.class), false, Resource.newInstance(1, 1),
+ mockAppContext, false, Resource.newInstance(1, 1),
mock(ContainerContext.class), false);
+ taId = ta.getID();
}
private void restoreFromTAStartEvent() {
@@ -157,6 +217,8 @@ public class TestTaskAttemptRecovery {
verifyEvents(events, TaskEventTAUpdate.class, 1);
// one for task launch, one for task killed
verifyEvents(events, DAGEventCounterUpdate.class, 2);
+
+ mockHistoryEventHandler.verfiyTaskAttemptFinishedEvent(taId, TaskAttemptState.KILLED, 1);
}
/**