You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:18 UTC
[3/4] TEZ-847. Support basic AM recovery. (hitesh)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7fe07af..9572f6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -25,6 +25,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -84,6 +85,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
@@ -91,6 +93,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -193,6 +196,14 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminateTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.NEW,
+ EnumSet.of(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
@@ -462,6 +473,8 @@ public class TaskAttemptImpl implements TaskAttempt,
.installTopology();
+ private TaskAttemptState recoveredState = TaskAttemptState.NEW;
+ private boolean recoveryStartEventSeen = false;
@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
@@ -782,6 +795,39 @@ public class TaskAttemptImpl implements TaskAttempt,
return isRescheduled;
}
+ @Override
+ public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case TASK_ATTEMPT_STARTED:
+ {
+ TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
+ this.launchTime = tEvent.getStartTime();
+ recoveryStartEventSeen = true;
+ recoveredState = TaskAttemptState.RUNNING;
+ return recoveredState;
+ }
+ case TASK_ATTEMPT_FINISHED:
+ {
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Started Event was encountered earlier");
+ }
+ TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
+ this.finishTime = tEvent.getFinishTime();
+ this.reportedStatus.counters = tEvent.getCounters();
+ this.reportedStatus.progress = 1f;
+ this.reportedStatus.state = tEvent.getState();
+ this.diagnostics.add(tEvent.getDiagnostics());
+ this.recoveredState = tEvent.getState();
+ return recoveredState;
+ }
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+
+ }
+ }
+
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
this.eventHandler.handle(event);
@@ -1366,7 +1412,46 @@ public class TaskAttemptImpl implements TaskAttempt,
}
}
-
+
+ protected static class RecoverTransition implements
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+ @Override
+ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+ TaskAttemptStateInternal endState = TaskAttemptStateInternal.FAILED;
+ switch(taskAttempt.recoveredState) {
+ case NEW:
+ case RUNNING:
+ // FIXME once running containers can be recovered, this
+ // should be handled differently
+ // TODO abort taskattempt
+ taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_FAILED));
+ endState = TaskAttemptStateInternal.FAILED;
+ break;
+ case SUCCEEDED:
+ // Do not inform Task as it already knows about completed attempts
+ endState = TaskAttemptStateInternal.SUCCEEDED;
+ break;
+ case FAILED:
+ // Do not inform Task as it already knows about completed attempts
+ endState = TaskAttemptStateInternal.FAILED;
+ break;
+ case KILLED:
+ // Do not inform Task as it already knows about completed attempts
+ endState = TaskAttemptStateInternal.KILLED;
+ break;
+ default:
+ throw new RuntimeException("Failed to recover from non-handled state"
+ + ", taskAttemptId=" + taskAttempt.getID()
+ + ", state=" + taskAttempt.recoveredState);
+ }
+
+ return endState;
+ }
+
+ }
+
protected static class TerminatedAfterSuccessTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 793c12a..16c063a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -24,6 +24,7 @@ import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -63,7 +64,9 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -74,11 +77,15 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -130,6 +137,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final SingleArcTransition<TaskImpl, TaskEvent>
ADD_TEZ_EVENT_TRANSITION = new AddTezEventTransition();
+ // Recovery related flags
+ boolean recoveryStartEventSeen = false;
+
private static final StateMachineFactory
<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory
@@ -142,11 +152,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
- TaskEventType.T_TERMINATE,
- new KillNewTransition())
+ TaskEventType.T_TERMINATE,
+ new KillNewTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.NEW,
TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+ // Recover transition
+ .addTransition(TaskStateInternal.NEW,
+ EnumSet.of(TaskStateInternal.NEW,
+ TaskStateInternal.SCHEDULED,
+ TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
+ TaskStateInternal.FAILED, TaskStateInternal.KILLED),
+ TaskEventType.T_RECOVER, new RecoverTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
@@ -154,7 +171,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
TaskEventType.T_TERMINATE,
- KILL_TRANSITION)
+ KILL_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED,
@@ -190,7 +207,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
KILL_TRANSITION)
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
-
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_SCHEDULE)
// Transitions from KILL_WAIT state
.addTransition(TaskStateInternal.KILL_WAIT,
@@ -235,6 +253,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_TERMINATE,
TaskEventType.T_ATTEMPT_SUCCEEDED, // Maybe track and reuse later
TaskEventType.T_ATTEMPT_LAUNCHED))
+ .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+ TaskEventType.T_SCHEDULE)
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -251,6 +271,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
EnumSet.of(
TaskEventType.T_TERMINATE,
TaskEventType.T_ADD_SPEC_ATTEMPT))
+ .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+ TaskEventType.T_SCHEDULE)
// create the topology tables
.installTopology();
@@ -292,6 +314,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private int finishedAttempts;//finish are total of success, failed and killed
private final boolean leafVertex;
+ private TaskState recoveredState = TaskState.NEW;
@Override
public TaskState getState() {
@@ -507,6 +530,92 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
return diagnostics;
}
+ private TaskAttempt createRecoveredEvent(TaskAttemptStartedEvent
+ taskAttemptStartedEvent) {
+ TaskAttempt taskAttempt = createAttempt(
+ taskAttemptStartedEvent.getTaskAttemptID().getId());
+ return taskAttempt;
+ }
+
+ @Override
+ public TaskState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case TASK_STARTED:
+ {
+ TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
+ recoveryStartEventSeen = true;
+ this.scheduledTime = tEvent.getScheduledTime();
+ if (this.attempts == null
+ || this.attempts.isEmpty()) {
+ this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
+ }
+ recoveredState = TaskState.SCHEDULED;
+ finishedAttempts = 0;
+ return recoveredState;
+ }
+ case TASK_FINISHED:
+ {
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Started Event was encountered earlier");
+ }
+ TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
+ recoveredState = tEvent.getState();
+ if (tEvent.getState() == TaskState.SUCCEEDED
+ && tEvent.getSuccessfulAttemptID() != null) {
+ successfulAttempt = tEvent.getSuccessfulAttemptID();
+ }
+ return recoveredState;
+ }
+ case TASK_ATTEMPT_STARTED:
+ {
+ TaskAttemptStartedEvent taskAttemptStartedEvent =
+ (TaskAttemptStartedEvent) historyEvent;
+ TaskAttempt recoveredAttempt = createRecoveredEvent(taskAttemptStartedEvent);
+ recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding restored attempt into known attempts map"
+ + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
+ }
+ this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
+ recoveredAttempt);
+ ++numberUncompletedAttempts;
+ this.recoveredState = TaskState.RUNNING;
+ return recoveredState;
+ }
+ case TASK_ATTEMPT_FINISHED:
+ {
+ finishedAttempts++;
+ --numberUncompletedAttempts;
+ if (numberUncompletedAttempts < 0) {
+ throw new RuntimeException("Invalid recovery event for attempt finished"
+ + ", more completions than starts encountered"
+ + ", finishedAttempts=" + finishedAttempts
+ + ", incompleteAttempts=" + numberUncompletedAttempts);
+ }
+ TaskAttemptFinishedEvent taskAttemptFinishedEvent =
+ (TaskAttemptFinishedEvent) historyEvent;
+ TaskAttempt taskAttempt = this.attempts.get(
+ taskAttemptFinishedEvent.getTaskAttemptID());
+ if (taskAttempt == null) {
+ throw new RuntimeException("Could not find task attempt"
+ + " when trying to recover"
+ + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID());
+ }
+ TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
+ taskAttemptFinishedEvent);
+ if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
+ recoveredState = TaskState.SUCCEEDED;
+ successfulAttempt = taskAttempt.getID();
+ }
+ return recoveredState;
+ }
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+ }
+ }
+
@VisibleForTesting
public TaskStateInternal getInternalState() {
readLock.lock();
@@ -851,6 +960,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// is called from within a transition
TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
getVertex().getName(), getLaunchTime(), clock.getTime(),
+ successfulAttempt,
TaskState.SUCCEEDED, getCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -858,7 +968,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
- getVertex().getName(), getLaunchTime(), clock.getTime(),
+ getVertex().getName(), getLaunchTime(), clock.getTime(), null,
finalState, getCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -992,6 +1102,120 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
+ private static class RecoverTransition implements
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
+ TaskStateInternal endState = TaskStateInternal.NEW;
+ if (task.attempts != null) {
+ for (TaskAttempt taskAttempt : task.attempts.values()) {
+ task.eventHandler.handle(new TaskAttemptEvent(
+ taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER));
+ }
+ }
+ LOG.info("Trying to recover task"
+ + ", taskId=" + task.getTaskId()
+ + ", recoveredState=" + task.recoveredState);
+ switch(task.recoveredState) {
+ case NEW:
+ // Nothing to do until the vertex schedules this task
+ endState = TaskStateInternal.NEW;
+ break;
+ case SCHEDULED:
+ case RUNNING:
+ case SUCCEEDED:
+ if (task.successfulAttempt != null) {
+ //Found successful attempt
+ //Recover data
+ boolean recoveredData = true;
+ if (task.getVertex().getOutputCommitters() != null
+ && !task.getVertex().getOutputCommitters().isEmpty()) {
+ for (Entry<String, OutputCommitter> entry
+ : task.getVertex().getOutputCommitters().entrySet()) {
+ LOG.info("Recovering data for task from previous DAG attempt"
+ + ", taskId=" + task.getTaskId()
+ + ", output=" + entry.getKey());
+ OutputCommitter committer = entry.getValue();
+ if (!committer.isTaskRecoverySupported()) {
+ LOG.info("Task recovery not supported by committer"
+ + ", failing task attempt"
+ + ", taskId=" + task.getTaskId()
+ + ", attemptId=" + task.successfulAttempt
+ + ", output=" + entry.getKey());
+ recoveredData = false;
+ break;
+ }
+ try {
+ committer.recoverTask(task.getTaskId().getId(),
+ task.appContext.getApplicationAttemptId().getAttemptId()-1);
+ } catch (Exception e) {
+ LOG.warn("Task recovery failed by committer"
+ + ", taskId=" + task.getTaskId()
+ + ", attemptId=" + task.successfulAttempt
+ + ", output=" + entry.getKey(), e);
+ recoveredData = false;
+ break;
+ }
+ }
+ }
+ if (!recoveredData) {
+ task.successfulAttempt = null;
+ } else {
+ LOG.info("Recovered a successful attempt"
+ + ", taskAttemptId=" + task.successfulAttempt.toString());
+ task.logJobHistoryTaskFinishedEvent();
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId,
+ getExternalState(TaskStateInternal.SUCCEEDED)));
+ task.eventHandler.handle(
+ new VertexEventTaskAttemptCompleted(
+ task.successfulAttempt, TaskAttemptStateInternal.SUCCEEDED));
+ endState = TaskStateInternal.SUCCEEDED;
+ break;
+ }
+ }
+
+ if (endState != TaskStateInternal.SUCCEEDED &&
+ task.attempts.size() >= task.maxAttempts) {
+ // Exceeded max attempts
+ task.finished(TaskStateInternal.FAILED);
+ endState = TaskStateInternal.FAILED;
+ break;
+ }
+
+ // no successful attempt and all attempts completed
+ // schedule a new one
+ // If any incomplete, the running attempt will moved to failed and its
+ // update will trigger a new attempt if possible
+ if (task.attempts.size() == task.finishedAttempts) {
+ task.addAndScheduleAttempt();
+ }
+ endState = TaskStateInternal.RUNNING;
+ break;
+ case KILLED:
+ // Nothing to do
+ // Inform vertex
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId,
+ getExternalState(TaskStateInternal.KILLED)));
+ endState = TaskStateInternal.KILLED;
+ break;
+ case FAILED:
+ // Nothing to do
+ // Inform vertex
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId,
+ getExternalState(TaskStateInternal.FAILED)));
+
+ endState = TaskStateInternal.FAILED;
+ break;
+ }
+
+ return endState;
+ }
+ }
+
private static class KillWaitAttemptCompletedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index d3e07cb..67f978a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -52,11 +52,9 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -94,13 +92,16 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
@@ -110,9 +111,12 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TezDAGID;
@@ -185,6 +189,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private int numStartedSourceVertices = 0;
private int numInitedSourceVertices = 0;
+ private int numRecoveredSourceVertices = 0;
+
private int distanceFromRoot = 0;
private final List<String> diagnostics = new ArrayList<String>();
@@ -209,6 +215,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new SourceTaskAttemptCompletedEventTransition();
+ private VertexState recoveredState = VertexState.NEW;
+ private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
+
protected static final
StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
stateMachineFactory
@@ -222,6 +231,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.INITIALIZING, VertexState.FAILED),
VertexEventType.V_INIT,
new InitTransition())
+ .addTransition
+ (VertexState.NEW,
+ EnumSet.of(VertexState.NEW, VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.RUNNING,
+ VertexState.SUCCEEDED, VertexState.FAILED,
+ VertexState.KILLED, VertexState.ERROR,
+ VertexState.RECOVERING),
+ VertexEventType.V_RECOVER,
+ new StartRecoverTransition())
+ .addTransition
+ (VertexState.RECOVERING,
+ EnumSet.of(VertexState.NEW, VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.RUNNING,
+ VertexState.SUCCEEDED, VertexState.FAILED,
+ VertexState.KILLED, VertexState.ERROR,
+ VertexState.RECOVERING),
+ VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+ new RecoverTransition())
+ .addTransition
+ (VertexState.NEW,
+ EnumSet.of(VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.RUNNING,
+ VertexState.SUCCEEDED, VertexState.FAILED,
+ VertexState.KILLED, VertexState.ERROR,
+ VertexState.RECOVERING),
+ VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+ new RecoverTransition())
.addTransition(VertexState.NEW, VertexState.NEW,
VertexEventType.V_SOURCE_VERTEX_STARTED,
new SourceVertexStartedTransition())
@@ -367,6 +403,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// reruns.
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
+ .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
+ VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ new TaskAttemptCompletedEventTransition())
// Transitions from FAILED state
.addTransition(
@@ -491,6 +530,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private VertexTerminationCause terminationCause;
private String logIdentifier;
+ private boolean recoveryCommitInProgress = false;
+ private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
+
+ // Recovery related flags
+ boolean recoveryInitEventSeen = false;
+ boolean recoveryStartEventSeen = false;
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, Configuration conf, EventHandler eventHandler,
@@ -515,7 +560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.vertexLocationHint = vertexLocationHint;
if (LOG.isDebugEnabled()) {
- logLocationHints(this.vertexLocationHint);
+ logLocationHints(this.vertexName, this.vertexLocationHint);
}
this.dagUgi = appContext.getCurrentDAG().getDagUGI();
@@ -800,6 +845,104 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return this.appContext;
}
+ private void handleParallelismUpdate(int newParallelism,
+ Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+ LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+ Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+ .iterator();
+ int i = 0;
+ while (iter.hasNext()) {
+ i++;
+ Map.Entry<TezTaskID, Task> entry = iter.next();
+ if (i <= newParallelism) {
+ continue;
+ }
+ iter.remove();
+ }
+ this.recoveredSourceEdgeManagers =
+ sourceEdgeManagers;
+ }
+
+ @Override
+ public VertexState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case VERTEX_INITIALIZED:
+ recoveryInitEventSeen = true;
+ recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
+ createTasks();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after Init event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_STARTED:
+ if (!recoveryInitEventSeen) {
+ throw new RuntimeException("Started Event seen but"
+ + " no Init Event was encountered earlier");
+ }
+ recoveryStartEventSeen = true;
+ VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
+ startTimeRequested = startedEvent.getStartRequestedTime();
+ startedTime = startedEvent.getStartTime();
+ recoveredState = VertexState.RUNNING;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after Started event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_PARALLELISM_UPDATED:
+ VertexParallelismUpdatedEvent updatedEvent =
+ (VertexParallelismUpdatedEvent) historyEvent;
+ if (updatedEvent.getVertexLocationHint() != null) {
+ vertexLocationHint = updatedEvent.getVertexLocationHint();
+ }
+ numTasks = updatedEvent.getNumTasks();
+ handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after parallelism updated event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_COMMIT_STARTED:
+ if (recoveredState != VertexState.RUNNING) {
+ throw new RuntimeException("Commit Started Event seen but"
+ + " recovered state is not RUNNING"
+ + ", recoveredState=" + recoveredState);
+ }
+ recoveryCommitInProgress = true;
+ return recoveredState;
+ case VERTEX_FINISHED:
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Started Event was encountered earlier");
+ }
+ recoveryCommitInProgress = false;
+ VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+ recoveredState = finishedEvent.getState();
+ diagnostics.add(finishedEvent.getDiagnostics());
+ finishTime = finishedEvent.getFinishTime();
+ // TODO counters ??
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after finished event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ VertexDataMovementEventsGeneratedEvent vEvent =
+ (VertexDataMovementEventsGeneratedEvent) historyEvent;
+ this.recoveredEvents.addAll(vEvent.getTezEvents());
+ return recoveredState;
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+
+ }
+ }
+
// TODO Create InputReadyVertexManager that schedules when there is something
// to read and use that as default instead of ImmediateStart.TEZ-480
@Override
@@ -829,8 +972,40 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
- writeLock.lock();
+ return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, false);
+ }
+
+ private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+ Map<String, EdgeManagerDescriptor> sourceEdgeManagers,
+ boolean recovering) {
+ if (recovering) {
+ writeLock.lock();
+ try {
+ if (sourceEdgeManagers != null) {
+ for(Map.Entry<String, EdgeManagerDescriptor> entry :
+ sourceEdgeManagers.entrySet()) {
+ LOG.info("Recovering edge manager for source:"
+ + entry.getKey() + " destination: " + getVertexId());
+ Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
+ Edge edge = sourceVertices.get(sourceVertex);
+ try {
+ edge.setCustomEdgeManager(entry.getValue());
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize edge manager for edge"
+ + ", sourceVertexName=" + sourceVertex.getName()
+ + ", destinationVertexName=" + edge.getDestinationVertexName(),
+ e);
+ return false;
+ }
+ }
+ }
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
setVertexLocationHint(vertexLocationHint);
+ writeLock.lock();
try {
if (parallelismSet == true) {
LOG.info("Parallelism can only be set dynamically once per vertex");
@@ -928,7 +1103,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
}
-
+
+ VertexParallelismUpdatedEvent parallelismUpdatedEvent =
+ new VertexParallelismUpdatedEvent(vertexId, numTasks,
+ vertexLocationHint,
+ sourceEdgeManagers);
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(),
+ parallelismUpdatedEvent));
+
// stop buffering events
for (Edge edge : sourceVertices.values()) {
edge.stopEventBuffering();
@@ -962,7 +1144,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
try {
this.vertexLocationHint = vertexLocationHint;
if (LOG.isDebugEnabled()) {
- logLocationHints(this.vertexLocationHint);
+ logLocationHints(this.vertexName, this.vertexLocationHint);
}
} finally {
writeLock.unlock();
@@ -1039,7 +1221,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
void logJobHistoryVertexInitializedEvent() {
VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
initTimeRequested, initedTime, numTasks,
- getProcessorName());
+ getProcessorName(), getAdditionalInputs());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGId(), initEvt));
}
@@ -1055,13 +1237,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.setFinishTime();
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
- startedTime, finishTime, VertexStatus.State.SUCCEEDED, "",
+ startedTime, finishTime, VertexState.SUCCEEDED, "",
getAllCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
- void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
+ void logJobHistoryVertexFailedEvent(VertexState state) {
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
@@ -1073,7 +1255,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
static VertexState checkVertexForCompletion(final VertexImpl vertex) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Checking for vertex completion"
+ LOG.debug("Checking for vertex completion for "
+ + vertex.logIdentifier
+ + ", numTasks=" + vertex.numTasks
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1084,6 +1268,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
//check for vertex failure first
if (vertex.completedTaskCount > vertex.tasks.size()) {
LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
+ + " for vertex " + vertex.logIdentifier
+ + ", numTasks=" + vertex.numTasks
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1096,6 +1282,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
LOG.info("Vertex succeeded: " + vertex.logIdentifier);
try {
+ if (vertex.outputCommitters != null) {
+ vertex.appContext.getHistoryHandler().handle(
+ new DAGHistoryEvent(vertex.getDAGId(),
+ new VertexCommitStartedEvent(vertex.vertexId)));
+ }
if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
// commit only once. Dont commit shared outputs
LOG.info("Invoking committer commit for vertex, vertexId="
@@ -1197,20 +1388,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (finishTime == 0) setFinishTime();
switch (finalState) {
- case KILLED:
- eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
- finalState, terminationCause));
- logJobHistoryVertexFailedEvent(VertexStatus.State.KILLED);
- break;
case ERROR:
eventHandler.handle(new DAGEvent(getDAGId(),
DAGEventType.INTERNAL_ERROR));
- logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+ logJobHistoryVertexFailedEvent(finalState);
break;
+ case KILLED:
case FAILED:
eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
finalState, terminationCause));
- logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+ logJobHistoryVertexFailedEvent(finalState);
break;
case SUCCEEDED:
eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
@@ -1227,58 +1414,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return finished(finalState, null);
}
- private VertexState initializeVertex() {
+
+ private void initializeCommitters() throws Exception {
if (!this.additionalOutputSpecs.isEmpty()) {
- try {
- LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
- for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
+ LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+ for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
additionalOutputs.entrySet()) {
- final String outputName = entry.getKey();
- final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
- if (od.getInitializerClassName() == null
+ final String outputName = entry.getKey();
+ final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
+ if (od.getInitializerClassName() == null
|| od.getInitializerClassName().isEmpty()) {
- LOG.info("Ignoring committer as none specified for output="
- + outputName
+ LOG.info("Ignoring committer as none specified for output="
+ + outputName
+ + ", vertexId=" + logIdentifier);
+ continue;
+ }
+ LOG.info("Instantiating committer for output=" + outputName
+ + ", vertexId=" + logIdentifier
+ + ", committerClass=" + od.getInitializerClassName());
+
+ dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
+ od.getInitializerClassName());
+ OutputCommitterContext outputCommitterContext =
+ new OutputCommitterContextImpl(appContext.getApplicationID(),
+ appContext.getApplicationAttemptId().getAttemptId(),
+ appContext.getCurrentDAG().getName(),
+ vertexName,
+ outputName,
+ od.getDescriptor().getUserPayload(),
+ vertexId.getId());
+
+ LOG.info("Invoking committer init for output=" + outputName
+ ", vertexId=" + logIdentifier);
- continue;
+ outputCommitter.initialize(outputCommitterContext);
+ outputCommitters.put(outputName, outputCommitter);
+ LOG.info("Invoking committer setup for output=" + outputName
+ + ", vertexId=" + logIdentifier);
+ outputCommitter.setupOutput();
+ return null;
}
- LOG.info("Instantiating committer for output=" + outputName
- + ", vertexId=" + logIdentifier
- + ", committerClass=" + od.getInitializerClassName());
-
- dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
- od.getInitializerClassName());
- OutputCommitterContext outputCommitterContext =
- new OutputCommitterContextImpl(appContext.getApplicationID(),
- appContext.getApplicationAttemptId().getAttemptId(),
- appContext.getCurrentDAG().getName(),
- vertexName,
- outputName,
- od.getDescriptor().getUserPayload());
-
- LOG.info("Invoking committer init for output=" + outputName
- + ", vertexId=" + logIdentifier);
- outputCommitter.initialize(outputCommitterContext);
- outputCommitters.put(outputName, outputCommitter);
- LOG.info("Invoking committer setup for output=" + outputName
- + ", vertexId=" + logIdentifier);
- outputCommitter.setupOutput();
- return null;
- }
- });
- }
- } catch (Exception e) {
- LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
- addDiagnostic("Vertex init failed : "
- + StringUtils.stringifyException(e));
- trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
- abortVertex(VertexStatus.State.FAILED);
- return finished(VertexState.FAILED);
+ });
}
}
+ }
+
+ private VertexState initializeVertex() {
+ try {
+ initializeCommitters();
+ } catch (Exception e) {
+ LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
+ addDiagnostic("Vertex init failed : "
+ + StringUtils.stringifyException(e));
+ trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+ abortVertex(VertexStatus.State.FAILED);
+ return finished(VertexState.FAILED);
+ }
+
// TODO: Metrics
initedTime = clock.getTime();
@@ -1329,130 +1523,645 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
- public static class InitTransition implements
- MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+ private VertexState setupVertex() {
+ return setupVertex(null);
+ }
- @Override
- public VertexState transition(VertexImpl vertex, VertexEvent event) {
- VertexState vertexState = VertexState.NEW;
- vertex.numInitedSourceVertices++;
- if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
- vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
- vertexState = handleInitEvent(vertex, event);
- if (vertexState != VertexState.FAILED) {
- if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
- for (Vertex target : vertex.targetVertices.keySet()) {
- vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
- VertexEventType.V_INIT));
- }
- }
+ private VertexState setupVertex(VertexInitializedEvent event) {
+
+ if (event == null) {
+ initTimeRequested = clock.getTime();
+ } else {
+ initTimeRequested = event.getInitRequestedTime();
+ initedTime = event.getInitedTime();
+ }
+
+ // VertexManager needs to be setup before attempting to Initialize any
+ // Inputs - since events generated by them will be routed to the
+ // VertexManager for handling.
+
+ if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
+ List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
+ for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
+ if (groupInfo.edgeMergedInputs.containsKey(getName())) {
+ InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
+ groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
+ Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
}
}
- return vertexState;
+ if (!groupSpecList.isEmpty()) {
+ groupInputSpecList = groupSpecList;
+ }
}
- private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
- vertex.initTimeRequested = vertex.clock.getTime();
-
- // VertexManager needs to be setup before attempting to Initialize any
- // Inputs - since events generated by them will be routed to the
- // VertexManager for handling.
-
- if (vertex.dagVertexGroups != null && !vertex.dagVertexGroups.isEmpty()) {
- List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
- for (VertexGroupInfo groupInfo : vertex.dagVertexGroups.values()) {
- if (groupInfo.edgeMergedInputs.containsKey(vertex.getName())) {
- InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(vertex.getName());
- groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
- Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
+ // Check if any inputs need initializers
+ if (event != null) {
+ this.additionalInputs = event.getAdditionalInputs();
+ if (additionalInputs != null) {
+ // FIXME References to descriptor kept in both objects
+ for (InputSpec inputSpec : this.additionalInputSpecs) {
+ if (additionalInputs.containsKey(inputSpec.getSourceVertexName())
+ && additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor() != null) {
+ inputSpec.setInputDescriptor(
+ additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor());
}
}
- if (!groupSpecList.isEmpty()) {
- vertex.groupInputSpecList = groupSpecList;
- }
}
-
- // Check if any inputs need initializers
- if (vertex.additionalInputs != null) {
- LOG.info("Root Inputs exist for Vertex: " + vertex.getName() + " : "
- + vertex.additionalInputs);
- for (RootInputLeafOutputDescriptor<InputDescriptor> input : vertex.additionalInputs.values()) {
+ } else {
+ if (additionalInputs != null) {
+ LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+ + additionalInputs);
+ for (RootInputLeafOutputDescriptor<InputDescriptor> input : additionalInputs.values()) {
if (input.getInitializerClassName() != null) {
- if (vertex.inputsWithInitializers == null) {
- vertex.inputsWithInitializers = Sets.newHashSet();
+ if (inputsWithInitializers == null) {
+ inputsWithInitializers = Sets.newHashSet();
}
- vertex.inputsWithInitializers.add(input.getEntityName());
+ inputsWithInitializers.add(input.getEntityName());
LOG.info("Starting root input initializer for input: "
+ input.getEntityName() + ", with class: ["
+ input.getInitializerClassName() + "]");
}
}
}
+ }
+
+ boolean hasBipartite = false;
+ if (sourceVertices != null) {
+ for (Edge edge : sourceVertices.values()) {
+ if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ hasBipartite = true;
+ break;
+ }
+ }
+ }
+
+ if (hasBipartite && inputsWithInitializers != null) {
+ LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
+ if (event != null) {
+ return VertexState.FAILED;
+ } else {
+ return finished(VertexState.FAILED);
+ }
+ }
+
+ boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
+
+ if (hasUserVertexManager) {
+ VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
+ .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan
+ .getVertexManagerPlugin());
+ LOG.info("Setting user vertex manager plugin: "
+ + pluginDesc.getClassName() + " on vertex: " + getName());
+ vertexManager = new VertexManager(pluginDesc, this, appContext);
+ } else {
+ if (hasBipartite) {
+ // setup vertex manager
+ // TODO this needs to consider data size and perhaps API.
+ // Currently implicitly BIPARTITE is the only edge type
+ LOG.info("Setting vertexManager to ShuffleVertexManager for "
+ + logIdentifier);
+ vertexManager = new VertexManager(new ShuffleVertexManager(),
+ this, appContext);
+ } else if (inputsWithInitializers != null) {
+ LOG.info("Setting vertexManager to RootInputVertexManager for "
+ + logIdentifier);
+ vertexManager = new VertexManager(new RootInputVertexManager(),
+ this, appContext);
+ } else {
+ // schedule all tasks upon vertex start. Default behavior.
+ LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+ + logIdentifier);
+ vertexManager = new VertexManager(
+ new ImmediateStartVertexManager(), this, appContext);
+ }
+ }
+
+ vertexManager.initialize();
+
+ // Setup tasks early if possible. If the VertexManager is not being used
+ // to set parallelism, sending events to Tasks is safe (and less confusing
+ // then relying on tasks to be created after TaskEvents are generated).
+ // For VertexManagers setting parallelism, the setParallelism call needs
+ // to be inline.
+ if (event != null) {
+ numTasks = event.getNumTasks();
+ } else {
+ numTasks = getVertexPlan().getTaskConfig().getNumTasks();
+ }
+
+ if (!(numTasks == -1 || numTasks >= 0)) {
+ addDiagnostic("Invalid task count for vertex"
+ + ", numTasks=" + numTasks);
+ trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
+ if (event != null) {
+ abortVertex(VertexStatus.State.FAILED);
+ return finished(VertexState.FAILED);
+ } else {
+ return VertexState.FAILED;
+ }
+ }
+
+ checkTaskLimits();
+ return VertexState.INITED;
+ }
+
+ public static class StartRecoverTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+ VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent;
+ VertexState desiredState = recoverEvent.getDesiredState();
- boolean hasBipartite = false;
- if (vertex.sourceVertices != null) {
- for (Edge edge : vertex.sourceVertices.values()) {
- if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
- hasBipartite = true;
+ switch (desiredState) {
+ case RUNNING:
+ break;
+ case SUCCEEDED:
+ case KILLED:
+ case FAILED:
+ case ERROR:
+ switch (desiredState) {
+ case SUCCEEDED:
+ vertex.succeededTaskCount = vertex.numTasks;
+ vertex.completedTaskCount = vertex.numTasks;
+ break;
+ case KILLED:
+ vertex.killedTaskCount = vertex.numTasks;
+ break;
+ case FAILED:
+ case ERROR:
+ vertex.failedTaskCount = vertex.numTasks;
+ break;
+ }
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (desiredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ case ERROR:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ }
+ LOG.info("DAG informed Vertex of its final completed state"
+ + ", vertex=" + vertex.logIdentifier
+ + ", state=" + desiredState);
+ return desiredState;
+ default:
+ LOG.info("Unhandled desired state provided by DAG"
+ + ", vertex=" + vertex.logIdentifier
+ + ", state=" + desiredState);
+ vertex.finished(VertexState.ERROR);
+ }
+
+ VertexState endState;
+ switch (vertex.recoveredState) {
+ case NEW:
+ // Trigger init and start as desired state is RUNNING
+ // Drop all root events
+ Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getEventType().equals(
+ EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+ iterator.remove();
+ }
+ }
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_INIT));
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ endState = VertexState.NEW;
+ break;
+ case INITED:
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+
+ // Recover tasks
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ }
+ // Update tasks with their input payloads as needed
+
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ if (vertex.getInputVertices().isEmpty()) {
+ endState = VertexState.INITED;
+ } else {
+ endState = VertexState.RECOVERING;
+ }
+ break;
+ case RUNNING:
+ vertex.tasksNotYetScheduled = false;
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
break;
}
+
+ // if commit in progress and desired state is not a succeeded one,
+ // move to failed
+ if (vertex.recoveryCommitInProgress) {
+ LOG.info("Recovered vertex was in the middle of a commit"
+ + ", failing Vertex=" + vertex.logIdentifier);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.COMMIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ assert vertex.tasks.size() == vertex.numTasks;
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = VertexState.SUCCEEDED;
+ vertex.finished(endState);
+ }
+ break;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ vertex.tasksNotYetScheduled = false;
+ // recover tasks
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (vertex.recoveredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = vertex.recoveredState;
+ vertex.finished(endState);
+ }
+ break;
+ default:
+ LOG.warn("Invalid recoveredState found when trying to recover"
+ + " vertex, recoveredState=" + vertex.recoveredState);
+ vertex.finished(VertexState.ERROR);
+ endState = VertexState.ERROR;
+ break;
+ }
+ if (!endState.equals(VertexState.RECOVERING)) {
+ LOG.info("Recovered Vertex State"
+ + ", vertexId=" + vertex.logIdentifier
+ + ", state=" + endState
+ + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices
+ + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+ + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+ + ", recoveredEvents="
+ + ( vertex.recoveredEvents == null ? "null" : vertex.recoveredEvents.size())
+ + ", tasksIsNull=" + (vertex.tasks == null)
+ + ", numTasks=" + ( vertex.tasks == null ? "null" : vertex.tasks.size()));
+ for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+ vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+ entry.getKey().getVertexId(),
+ vertex.vertexId, endState, null));
}
}
-
- if (hasBipartite && vertex.inputsWithInitializers != null) {
- LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
- return vertex.finished(VertexState.FAILED);
+ if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+ .contains(endState)) {
+ // Send events downstream
+ vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+ vertex.recoveredEvents.clear();
+ } else {
+ // Ensure no recovered events
+ if (!vertex.recoveredEvents.isEmpty()) {
+ throw new RuntimeException("Invalid Vertex state"
+ + ", found non-zero recovered events in invalid state"
+ + ", recoveredState=" + endState
+ + ", recoveredEvents=" + vertex.recoveredEvents.size());
+ }
}
-
- boolean hasUserVertexManager = vertex.vertexPlan.hasVertexManagerPlugin();
-
- if (hasUserVertexManager) {
- VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
- .convertVertexManagerPluginDescriptorFromDAGPlan(vertex.vertexPlan
- .getVertexManagerPlugin());
- LOG.info("Setting user vertex manager plugin: "
- + pluginDesc.getClassName() + " on vertex: " + vertex.getName());
- vertex.vertexManager = new VertexManager(pluginDesc, vertex, vertex.appContext);
+ return endState;
+ }
+
+ }
+
+ private void routeRecoveredEvents(VertexState vertexState,
+ List<TezEvent> tezEvents) {
+ for (TezEvent tezEvent : tezEvents) {
+ EventMetaData sourceMeta = tezEvent.getSourceInfo();
+ TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+ if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
+ ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
+ ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
+ ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+ if (vertexState == VertexState.RUNNING
+ || vertexState == VertexState.INITED) {
+ // Only routed if vertex is still running
+ eventHandler.handle(new VertexEventRouteEvent(
+ this.getVertexId(), Collections.singletonList(tezEvent), true));
+ }
+ continue;
+ }
+
+ Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
+ Edge destEdge = targetVertices.get(destVertex);
+ if (destEdge == null) {
+ throw new TezUncheckedException("Bad destination vertex: " +
+ sourceMeta.getEdgeVertexName() + " for event vertex: " +
+ getVertexId());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Routing recovered event"
+ + ", eventType=" + tezEvent.getEventType()
+ + ", sourceInfo=" + sourceMeta
+ + ", destinationVertex" + destVertex.getName());
+ }
+ eventHandler.handle(new VertexEventRouteEvent(destVertex
+ .getVertexId(), Collections.singletonList(tezEvent), true));
+ }
+ }
+
+ public static class RecoverTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+ VertexEventSourceVertexRecovered sourceRecoveredEvent =
+ (VertexEventSourceVertexRecovered) vertexEvent;
+ ++vertex.numRecoveredSourceVertices;
+
+ switch (sourceRecoveredEvent.getSourceVertexState()) {
+ case NEW:
+ // Nothing to do
+ break;
+ case INITED:
+ ++vertex.numInitedSourceVertices;
+ break;
+ case RUNNING:
+ case SUCCEEDED:
+ ++vertex.numInitedSourceVertices;
+ ++vertex.numStartedSourceVertices;
+ if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) {
+ vertex.pendingReportedSrcCompletions.addAll(
+ sourceRecoveredEvent.getCompletedTaskAttempts());
+ }
+ break;
+ case FAILED:
+ case KILLED:
+ case ERROR:
+ // Nothing to do
+ // Recover as if source vertices have not inited/started
+ break;
+ default:
+ LOG.warn("Received invalid SourceVertexRecovered event"
+ + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID()
+ + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState());
+ return vertex.finished(VertexState.ERROR);
+ }
+
+ if (vertex.numRecoveredSourceVertices !=
+ vertex.getInputVerticesCount()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for source vertices to recover"
+ + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+ + ", totalSourceVertices=" + vertex.getInputVerticesCount());
+ }
+ return VertexState.RECOVERING;
+ }
+
+
+ // Complete recovery
+ VertexState endState = VertexState.NEW;
+ List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList();
+ switch (vertex.recoveredState) {
+ case NEW:
+ // Drop all root events if not inited properly
+ Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getEventType().equals(
+ EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+ iterator.remove();
+ }
+ }
+ // Trigger init if all sources initialized
+ if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) {
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_INIT));
+ }
+ if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ }
+ endState = VertexState.NEW;
+ break;
+ case INITED:
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ if (!vertex.setParallelism(0,
+ null, vertex.recoveredSourceEdgeManagers, true)) {
+ LOG.info("Failed to recover edge managers");
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ // Recover tasks
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ }
+ if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ }
+ endState = VertexState.INITED;
+ break;
+ case RUNNING:
+ vertex.tasksNotYetScheduled = false;
+ // if commit in progress and desired state is not a succeeded one,
+ // move to failed
+ if (vertex.recoveryCommitInProgress) {
+ LOG.info("Recovered vertex was in the middle of a commit"
+ + ", failing Vertex=" + vertex.logIdentifier);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.COMMIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ if (!vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, true)) {
+ LOG.info("Failed to recover edge managers");
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ assert vertex.tasks.size() == vertex.numTasks;
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = VertexState.SUCCEEDED;
+ vertex.finished(endState);
+ }
+ break;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ vertex.tasksNotYetScheduled = false;
+ // recover tasks
+ assert vertex.tasks.size() == vertex.numTasks;
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (vertex.recoveredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ // Wait for all tasks to recover and report back
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = vertex.recoveredState;
+ vertex.finished(endState);
+ }
+ break;
+ default:
+ LOG.warn("Invalid recoveredState found when trying to recover"
+ + " vertex, recoveredState=" + vertex.recoveredState);
+ vertex.finished(VertexState.ERROR);
+ endState = VertexState.ERROR;
+ break;
+ }
+
+ LOG.info("Recovered Vertex State"
+ + ", vertexId=" + vertex.logIdentifier
+ + ", state=" + endState
+ + ", numInitedSourceVertices" + vertex.numInitedSourceVertices
+ + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+ + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+ + ", tasksIsNull=" + (vertex.tasks == null)
+ + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size()));
+
+ for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+ vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+ entry.getKey().getVertexId(),
+ vertex.vertexId, endState, completedTaskAttempts));
+ }
+ if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+ .contains(endState)) {
+ // Send events downstream
+ vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+ vertex.recoveredEvents.clear();
} else {
- if (hasBipartite) {
- // setup vertex manager
- // TODO this needs to consider data size and perhaps API.
- // Currently implicitly BIPARTITE is the only edge type
- LOG.info("Setting vertexManager to ShuffleVertexManager for "
- + vertex.logIdentifier);
- vertex.vertexManager = new VertexManager(new ShuffleVertexManager(),
- vertex, vertex.appContext);
- } else if (vertex.inputsWithInitializers != null) {
- LOG.info("Setting vertexManager to RootInputVertexManager for "
- + vertex.logIdentifier);
- vertex.vertexManager = new VertexManager(new RootInputVertexManager(),
- vertex, vertex.appContext);
- } else {
- // schedule all tasks upon vertex start. Default behavior.
- LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
- + vertex.logIdentifier);
- vertex.vertexManager = new VertexManager(
- new ImmediateStartVertexManager(), vertex, vertex.appContext);
+ // Ensure no recovered events
+ if (!vertex.recoveredEvents.isEmpty()) {
+ throw new RuntimeException("Invalid Vertex state"
+ + ", found non-zero recovered events in invalid state"
+ + ", recoveredState=" + endState
+ + ", recoveredEvents=" + vertex.recoveredEvents.size());
}
}
-
- vertex.vertexManager.initialize();
-
- // Setup tasks early if possible. If the VertexManager is not being used
- // to set parallelism, sending events to Tasks is safe (and less confusing
- // then relying on tasks to be created after TaskEvents are generated).
- // For VertexManagers setting parallelism, the setParallelism call needs
- // to be inline.
- vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
- if (!(vertex.numTasks == -1 || vertex.numTasks >= 0)) {
- vertex.addDiagnostic("Invalid task count for vertex"
- + ", numTasks=" + vertex.numTasks);
- vertex.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
- vertex.abortVertex(VertexStatus.State.FAILED);
- return vertex.finished(VertexState.FAILED);
+ return endState;
+ }
+
+ }
+
+ public static class InitTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ VertexState vertexState = VertexState.NEW;
+ vertex.numInitedSourceVertices++;
+ if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
+ vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+ vertexState = handleInitEvent(vertex, event);
+ if (vertexState != VertexState.FAILED) {
+ if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
+ for (Vertex target : vertex.targetVertices.keySet()) {
+ vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
+ VertexEventType.V_INIT));
+ }
+ }
+ }
}
+ return vertexState;
+ }
- vertex.checkTaskLimits();
+ private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
+ VertexState state = vertex.setupVertex();
+ if (state.equals(VertexState.FAILED)) {
+ return state;
+ }
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
@@ -1862,7 +2571,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.numSuccessSourceAttemptCompletions++;
if (vertex.getState() == VertexState.RUNNING) {
vertex.vertexManager.onSourceTaskCompleted(completionEvent
- .getTaskAttemptId());
+ .getTaskAttemptId().getTaskID());
} else {
vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
}
@@ -2034,9 +2743,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
+ boolean recovered = rEvent.isRecovered();
List<TezEvent> tezEvents = rEvent.getEvents();
if (vertex.getAppContext().isRecoveryEnabled()
+ && !recovered
&& !tezEvents.isEmpty()) {
List<TezEvent> dataMovementEvents =
Lists.newArrayList();
@@ -2061,7 +2772,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
for(TezEvent tezEvent : tezEvents) {
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: " + vertex.getName() + " routing event: "
- + tezEvent.getEventType());
+ + tezEvent.getEventType()
+ + " Recovered:" + recovered);
}
EventMetaData sourceMeta = tezEvent.getSourceInfo();
switch(tezEvent.getEventType()) {
@@ -2074,7 +2786,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
- } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
+ } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
} else {
((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
@@ -2396,7 +3108,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return this.vertexManager;
}
- private static void logLocationHints(VertexLocationHint locationHint) {
+ private static void logLocationHints(String vertexName,
+ VertexLocationHint locationHint) {
+ if (locationHint == null) {
+ LOG.debug("No Vertex LocationHint specified for vertex=" + vertexName);
+ return;
+ }
Multiset<String> hosts = HashMultiset.create();
Multiset<String> racks = HashMultiset.create();
int counter = 0;
@@ -2421,18 +3138,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
sb.append(rack).append(", ");
}
}
- LOG.debug("Location: " + counter + " : " + sb.toString());
+ LOG.debug("Vertex: " + vertexName + ", Location: "
+ + counter + " : " + sb.toString());
counter++;
}
- LOG.debug("Host Counts");
+ LOG.debug("Vertex: " + vertexName + ", Host Counts");
for (Multiset.Entry<String> host : hosts.entrySet()) {
- LOG.debug("host: " + host.toString());
+ LOG.debug("Vertex: " + vertexName + ", host: " + host.toString());
}
- LOG.debug("Rack Counts");
+ LOG.debug("Vertex: " + vertexName + ", Rack Counts");
for (Multiset.Entry<String> rack : racks.entrySet()) {
- LOG.debug("rack: " + rack.toString());
+ LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index e08e7ed..6f75481 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -147,6 +147,11 @@ public class VertexManager {
managedVertex.setVertexLocationHint(locationHint);
}
+ @Override
+ public int getDAGAttemptNumber() {
+ return appContext.getApplicationAttemptId().getAttemptId();
+ }
+
private void verifyIsRootInput(String inputName) {
Preconditions.checkState(managedVertex.getAdditionalInputs().get(inputName) != null,
"Cannot add events for non-root inputs");
@@ -222,23 +227,24 @@ public class VertexManager {
public void onVertexStarted(List<TezTaskAttemptID> completions) {
Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
- for (TezTaskAttemptID attemptId : completions) {
- TezTaskID tezTaskId = attemptId.getTaskID();
- Integer taskId = new Integer(tezTaskId.getId());
- String vertexName =
- appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
- List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
- if (taskIdList == null) {
- taskIdList = Lists.newArrayList();
- pluginCompletionsMap.put(vertexName, taskIdList);
+ if (completions != null && !completions.isEmpty()) {
+ for (TezTaskAttemptID tezTaskAttemptID : completions) {
+ Integer taskId = new Integer(tezTaskAttemptID.getTaskID().getId());
+ String vertexName =
+ appContext.getCurrentDAG().getVertex(
+ tezTaskAttemptID.getTaskID().getVertexID()).getName();
+ List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
+ if (taskIdList == null) {
+ taskIdList = Lists.newArrayList();
+ pluginCompletionsMap.put(vertexName, taskIdList);
+ }
+ taskIdList.add(taskId);
}
- taskIdList.add(taskId);
}
plugin.onVertexStarted(pluginCompletionsMap);
}
- public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
- TezTaskID tezTaskId = attemptId.getTaskID();
+ public void onSourceTaskCompleted(TezTaskID tezTaskId) {
Integer taskId = new Integer(tezTaskId.getId());
String vertexName =
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index a71686b..7b2087a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -27,11 +27,14 @@ public enum HistoryEventType {
DAG_FINISHED,
VERTEX_INITIALIZED,
VERTEX_STARTED,
+ VERTEX_PARALLELISM_UPDATED,
VERTEX_FINISHED,
TASK_STARTED,
TASK_FINISHED,
TASK_ATTEMPT_STARTED,
TASK_ATTEMPT_FINISHED,
CONTAINER_LAUNCHED,
- VERTEX_DATA_MOVEMENT_EVENTS_GENERATED
+ VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
+ DAG_COMMIT_STARTED,
+ VERTEX_COMMIT_STARTED
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
index 4ec0632..690e850 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
@@ -22,16 +22,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.recovery.RecoveryService;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 4794a7b..54bc658 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -98,7 +98,7 @@ public class AMLaunchedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return true;
+ return false;
}
@Override
@@ -139,4 +139,16 @@ public class AMLaunchedEvent implements HistoryEvent {
fromProto(proto);
}
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public long getAppSubmitTime() {
+ return appSubmitTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index b3cbb5c..e66141b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -129,4 +129,13 @@ public class AMStartedEvent implements HistoryEvent {
fromProto(proto);
}
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 066f315..471ddd1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -138,4 +138,16 @@ public class ContainerLaunchedEvent implements HistoryEvent {
+ ", launchTime=" + launchTime;
}
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
}