You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/03 02:37:48 UTC
tez git commit: TEZ-2304. InvalidStateTransitonException TA_SCHEDULE
at START_WAIT during recovery (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 631c3e9c6 -> 44a6bb686
TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/44a6bb68
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/44a6bb68
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/44a6bb68
Branch: refs/heads/master
Commit: 44a6bb686b08326b45c232fb7466b43c574fe7c1
Parents: 631c3e9
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Jun 3 08:37:39 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Jun 3 08:37:39 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 17 +-----
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 42 ++++----------
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 59 +++++++++++++++++++-
.../app/dag/impl/TestTaskAttemptRecovery.java | 14 ++---
.../tez/dag/app/dag/impl/TestTaskImpl.java | 8 +++
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 10 ++++
7 files changed, 95 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4c4967..0f1eabd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -418,6 +418,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2304. InvalidStateTransitonException TA_SCHEDULE at START_WAIT during recovery
TEZ-2488. Tez AM crashes if a submitted DAG is configured to use invalid resource sizes.
TEZ-2080. Localclient should be using tezconf in init instead of yarnconf.
TEZ-2369. Add a few unit tests for RootInputInitializerManager.
http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 036022e..2650be2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -752,10 +752,6 @@ public class TaskAttemptImpl implements TaskAttempt,
}
case TASK_ATTEMPT_FINISHED:
{
- if (!recoveryStartEventSeen) {
- throw new RuntimeException("Finished Event seen but"
- + " no Started Event was encountered earlier");
- }
TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
this.finishTime = tEvent.getFinishTime();
this.reportedStatus.counters = tEvent.getCounters();
@@ -1117,17 +1113,8 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
helper.getTaskAttemptState()));
- if (ta.getLaunchTime() != 0) {
- // TODO For cases like this, recovery goes for a toss, since the the
- // attempt will not exist in the history file.
- ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
- .getTaskAttemptState());
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not generating HistoryFinish event since start event not "
- + "generated for taskAttempt: " + ta.getID());
- }
- }
+ ta.logJobHistoryAttemptUnsuccesfulCompletion(helper
+ .getTaskAttemptState());
// Send out events to the Task - indicating TaskAttemptTermination(F/K)
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
.getTaskEventType()));
http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index b2eb81e..1251ac4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -144,8 +144,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// track the status of TaskAttempt (true mean completed, false mean uncompleted)
private final Map<Integer, Boolean> taskAttemptStatus = new HashMap<Integer,Boolean>();
- private boolean historyTaskStartGenerated = false;
-
private static final SingleArcTransition<TaskImpl , TaskEvent>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
private static final SingleArcTransition<TaskImpl, TaskEvent>
@@ -570,7 +568,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
}
recoveredState = TaskState.SCHEDULED;
- historyTaskStartGenerated = true;
taskAttemptStatus.clear();
return recoveredState;
}
@@ -602,8 +599,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
LOG.debug("Adding restored attempt into known attempts map"
+ ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
}
- this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
- recoveredAttempt);
+ Preconditions.checkArgument(this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
+ recoveredAttempt) == null, taskAttemptStartedEvent.getTaskAttemptID() + " already existed.");
this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(), false);
this.recoveredState = TaskState.RUNNING;
return recoveredState;
@@ -850,11 +847,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
= new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(maxFailedAttempts);
newAttempts.putAll(attempts);
attempts = newAttempts;
- attempts.put(attempt.getID(), attempt);
+ Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
+ attempt.getID() + " already existed");
break;
default:
- attempts.put(attempt.getID(), attempt);
+ Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
+ attempt.getID() + " already existed");
break;
}
@@ -1050,7 +1049,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.addAndScheduleAttempt();
task.scheduledTime = task.clock.getTime();
task.logJobHistoryTaskStartedEvent();
- task.historyTaskStartGenerated = true;
}
}
@@ -1092,9 +1090,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.eventHandler.handle(new VertexEventTaskCompleted(
task.taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
- if (task.historyTaskStartGenerated) {
- task.logJobHistoryTaskFinishedEvent();
- }
+ task.logJobHistoryTaskFinishedEvent();
TaskAttempt successfulAttempt = task.attempts.get(successTaId);
// issue kill to all other attempts
@@ -1297,13 +1293,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);
// check whether all attempts are finished
if (task.getFinishedAttemptsCount() == task.attempts.size()) {
- if (task.historyTaskStartGenerated) {
- task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
- } else {
- LOG.debug("Not generating HistoryFinish event since start event not" +
- " generated for task: " + task.getTaskId());
- }
-
+ task.logJobHistoryTaskFailedEvent(getExternalState(TaskStateInternal.KILLED));
task.eventHandler.handle(
new VertexEventTaskCompleted(
task.taskId, getExternalState(TaskStateInternal.KILLED)));
@@ -1349,13 +1339,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.handleTaskAttemptCompletion(
((TaskEventTAUpdate) event).getTaskAttemptID(),
TaskAttemptStateInternal.FAILED);
-
- if (task.historyTaskStartGenerated) {
- task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
- } else {
- LOG.debug("Not generating HistoryFinish event since start event not" +
- " generated for task: " + task.getTaskId());
- }
+ task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
task.eventHandler.handle(
new VertexEventTaskCompleted(task.taskId, TaskState.FAILED));
return task.finished(TaskStateInternal.FAILED);
@@ -1464,13 +1448,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public void transition(TaskImpl task, TaskEvent event) {
TaskEventTermination terminateEvent = (TaskEventTermination)event;
task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
- if (task.historyTaskStartGenerated) {
- task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
- } else {
- LOG.debug("Not generating HistoryFinish event since start event not" +
- " generated for task: " + task.getTaskId());
- }
-
+ task.logJobHistoryTaskFailedEvent(TaskState.KILLED);
task.eventHandler.handle(
new VertexEventTaskCompleted(task.taskId, TaskState.KILLED));
// TODO Metrics
http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 86251cc..15d5609 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1212,6 +1212,59 @@ public class TestTaskAttempt {
arg.capture());
}
+ @SuppressWarnings("deprecation")
+ @Test(timeout = 5000)
+ public void testKilledInNew() {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+ MockEventHandler eventHandler = spy(new MockEventHandler());
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+ locationHint = TaskLocationHint.createTaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AppContext appCtx = mock(AppContext.class);
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appCtx);
+ containers.addContainerIfNew(container);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+ MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), true);
+ Assert.assertEquals(TaskAttemptStateInternal.NEW, taImpl.getInternalState());
+ taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it",
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
+ Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+
+ Assert.assertEquals(0, taImpl.taskAttemptStartedEventLogged);
+ Assert.assertEquals(1, taImpl.taskAttemptFinishedEventLogged);
+ }
+
private Event verifyEventType(List<Event> events,
Class<? extends Event> eventClass, int expectedOccurences) {
int count = 0;
@@ -1244,6 +1297,8 @@ public class TestTaskAttempt {
private class MockTaskAttemptImpl extends TaskAttemptImpl {
+ public int taskAttemptStartedEventLogged = 0;
+ public int taskAttemptFinishedEventLogged = 0;
public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
EventHandler eventHandler, TaskAttemptListener tal,
Configuration conf, Clock clock,
@@ -1273,17 +1328,19 @@ public class TestTaskAttempt {
@Override
protected void logJobHistoryAttemptStarted() {
+ taskAttemptStartedEventLogged++;
}
@Override
protected void logJobHistoryAttemptFinishedEvent(
TaskAttemptStateInternal state) {
-
+ taskAttemptFinishedEventLogged++;
}
@Override
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
+ taskAttemptFinishedEventLogged++;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index d6d874d..0665b1e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -297,16 +297,14 @@ public class TestTaskAttemptRecovery {
}
/**
- * restoreFromTAFinishedEvent ( no TAStartEvent before TAFinishedEvent )
+ * restoreFromTAFinishedEvent ( killed before started)
*/
@Test(timeout = 5000)
public void testRecover_FINISH_BUT_NO_START() {
- try {
- restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
- fail("Should fail due to no TaskAttemptStartEvent before TaskAttemptFinishedEvent");
- } catch (Throwable e) {
- assertEquals("Finished Event seen but"
- + " no Started Event was encountered earlier", e.getMessage());
- }
+ TaskAttemptState recoveredState =
+ ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+ startTime, finishTime, TaskAttemptState.KILLED,
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters()));
+ assertEquals(TaskAttemptState.KILLED, recoveredState);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1ecabef..87dd2fa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -634,6 +634,8 @@ public class TestTaskImpl {
mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
assertEquals(1, mockTask.getDiagnostics().size());
assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
+ assertEquals(0, mockTask.taskStartedEventLogged);
+ assertEquals(1, mockTask.taskFinishedEventLogged);
}
@Test(timeout = 5000)
@@ -674,6 +676,9 @@ public class TestTaskImpl {
@SuppressWarnings("rawtypes")
private class MockTaskImpl extends TaskImpl {
+ public int taskStartedEventLogged = 0;
+ public int taskFinishedEventLogged = 0;
+
private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
private Vertex vertex;
@@ -719,12 +724,15 @@ public class TestTaskImpl {
}
protected void logJobHistoryTaskStartedEvent() {
+ taskStartedEventLogged++;
}
protected void logJobHistoryTaskFinishedEvent() {
+ taskFinishedEventLogged++;
}
protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
+ taskFinishedEventLogged++;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/44a6bb68/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 2a49826..a16fad2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -250,6 +250,16 @@ public class TestTaskRecovery {
}
/**
+ * -> restoreFromTaskFinishEvent ( no TaskStartEvent )
+ */
+ @Test(timeout = 5000)
+ public void testRecoveryNewToKilled_NoStartEvent() {
+ task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName,
+ taskStartTime, taskFinishTime, null, TaskState.KILLED, "",
+ new TezCounters(), 0));
+ }
+
+ /**
* restoreFromTaskStartedEvent -> RecoverTransition
*/
@Test(timeout = 5000)