You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/09/25 03:43:58 UTC
[1/2] TEZ-431. Implement fault tolerance,
retries and event flow for dealing with failed inputs (bikas)
Updated Branches:
refs/heads/TEZ-398 b212ca1d2 -> 3749a18fa
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 520473d..cff71ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -74,6 +73,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexScheduler;
@@ -83,6 +83,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -95,8 +96,8 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -114,7 +115,6 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultiset;
@@ -130,18 +130,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
- private static final TezDependentTaskCompletionEvent[]
- EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS =
- new TezDependentTaskCompletionEvent[0];
private static final Log LOG = LogFactory.getLog(VertexImpl.class);
- //The maximum fraction of fetch failures allowed for a map
- private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
-
- // Maximum no. of fetch-failure notifications after which map task is failed
- private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
-
//final fields
private final Clock clock;
@@ -160,7 +151,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private boolean lazyTasksCopyNeeded = false;
// must be a linked map for ordering
volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
- private List<byte[]> taskUserPayloads = null;
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
private Resource taskResource;
@@ -172,15 +162,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private int numStartedSourceVertices = 0;
private int distanceFromRoot = 0;
- private List<TezDependentTaskCompletionEvent> sourceTaskAttemptCompletionEvents;
private final List<String> diagnostics = new ArrayList<String>();
//task/attempt related datastructures
@VisibleForTesting
- final Map<TezTaskID, Integer> successSourceAttemptCompletionEventNoMap =
- new HashMap<TezTaskID, Integer>();
- private final Map<TezTaskAttemptID, Integer> fetchFailuresMapping =
- new HashMap<TezTaskAttemptID, Integer>();
+ int numSuccessSourceAttemptCompletions = 0;
List<InputSpec> inputSpecList;
List<OutputSpec> outputSpecList;
@@ -212,7 +198,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TERMINATE,
new TerminateNewVertexTransition())
.addTransition(VertexState.NEW, VertexState.ERROR,
- VertexEventType.INTERNAL_ERROR,
+ VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from INITED state
@@ -227,7 +213,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TERMINATE,
new TerminateInitedVertexTransition())
.addTransition(VertexState.INITED, VertexState.ERROR,
- VertexEventType.INTERNAL_ERROR,
+ VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
@@ -249,12 +235,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.RUNNING, VertexState.RUNNING,
VertexEventType.V_TASK_RESCHEDULED,
new TaskRescheduledTransition())
- .addTransition(VertexState.RUNNING, VertexState.RUNNING,
- VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
- new TaskAttemptFetchFailureTransition())
.addTransition(
VertexState.RUNNING,
- VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+ VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
.addTransition(
VertexState.RUNNING,
@@ -275,48 +258,49 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition(
VertexState.TERMINATING,
- VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+ VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
EnumSet.of(VertexEventType.V_TERMINATE,
- VertexEventType.V_TASK_RESCHEDULED,
- VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE))
+ VertexEventType.V_TASK_RESCHEDULED))
// Transitions from SUCCEEDED state
.addTransition(
VertexState.SUCCEEDED,
- VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+ VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ .addTransition(VertexState.SUCCEEDED,
+ EnumSet.of(VertexState.RUNNING, VertexState.FAILED),
+ VertexEventType.V_TASK_RESCHEDULED,
+ new TaskRescheduledAfterVertexSuccessTransition())
+
// Ignore-able events
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
EnumSet.of(VertexEventType.V_TERMINATE,
- VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
// Transitions from FAILED state
.addTransition(
VertexState.FAILED,
- VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+ VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(VertexState.FAILED, VertexState.FAILED,
EnumSet.of(VertexEventType.V_TERMINATE,
- VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
// Transitions from KILLED state
.addTransition(
VertexState.KILLED,
- VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
+ VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(VertexState.KILLED, VertexState.KILLED,
EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_START,
- VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
@@ -330,8 +314,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_DIAGNOSTIC_UPDATE,
- VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE,
- VertexEventType.INTERNAL_ERROR))
+ VertexEventType.V_INTERNAL_ERROR))
// create the topology tables
.installTopology();
@@ -550,32 +533,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
- public TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
- TezTaskAttemptID attemptID, int fromEventId, int maxEvents) {
- TezDependentTaskCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
- readLock.lock();
- try {
- if (sourceTaskAttemptCompletionEvents.size() > fromEventId) {
- int actualMax = Math.min(maxEvents,
- (sourceTaskAttemptCompletionEvents.size() - fromEventId));
- events = sourceTaskAttemptCompletionEvents.subList(fromEventId,
- actualMax + fromEventId).toArray(events);
- // create a copy if user payload is different per task
- if(taskUserPayloads != null && events.length > 0) {
- int taskId = attemptID.getTaskID().getId();
- byte[] userPayload = taskUserPayloads.get(taskId);
- TezDependentTaskCompletionEvent event = events[0].clone();
- event.setUserPayload(userPayload);
- events[0] = event;
- }
- }
- return events;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
public List<String> getDiagnostics() {
readLock.lock();
try {
@@ -683,6 +640,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ // TODO Create InputReadyVertexManager that schedules when there is something
+ // to read and use that as default instead of ImmediateStart.TEZ-480
@Override
public void scheduleTasks(Collection<TezTaskID> taskIDs) {
readLock.lock();
@@ -808,7 +767,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.error("Can't handle " + message, e);
addDiagnostic(message);
eventHandler.handle(new VertexEvent(this.vertexId,
- VertexEventType.INTERNAL_ERROR));
+ VertexEventType.V_INTERNAL_ERROR));
}
if (oldState != getInternalState()) {
@@ -1028,10 +987,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
checkTaskLimits();
- // TODO should depend on source num tasks
- vertex.sourceTaskAttemptCompletionEvents =
- new ArrayList<TezDependentTaskCompletionEvent>(vertex.numTasks + 10);
-
// create the Tasks but don't start them yet
createTasks(vertex);
@@ -1269,98 +1224,43 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
- TezDependentTaskCompletionEvent tce =
+ VertexEventTaskAttemptCompleted completionEvent =
((VertexEventSourceTaskAttemptCompleted) event).getCompletionEvent();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding completion event to vertex: " + vertex.getName()
- + " attempt: " + tce.getTaskAttemptID());
- }
- // Add the TaskAttemptCompletionEvent
- //eventId is equal to index in the arraylist
- tce.setEventId(vertex.sourceTaskAttemptCompletionEvents.size());
- vertex.sourceTaskAttemptCompletionEvents.add(tce);
- // TODO this needs to be ordered/grouped by source vertices or else
- // my tasks will not know which events are for which vertices' tasks. This
- // differentiation was not needed for MR because there was only 1 M stage.
- // if the tce is sent to the task then a solution could be to add vertex
- // name to the tce
- // need to send vertex name and task index in that vertex
-
- TezTaskAttemptID attemptId = tce.getTaskAttemptID();
- TezTaskID taskId = attemptId.getTaskID();
- //make the previous completion event as obsolete if it exists
- if (TezDependentTaskCompletionEvent.Status.SUCCEEDED.equals(tce.getStatus())) {
- vertex.vertexScheduler.onSourceTaskCompleted(attemptId, tce);
- Object successEventNo =
- vertex.successSourceAttemptCompletionEventNoMap.remove(taskId);
- if (successEventNo != null) {
- TezDependentTaskCompletionEvent successEvent =
- vertex.sourceTaskAttemptCompletionEvents.get((Integer) successEventNo);
- successEvent.setTaskStatus(TezDependentTaskCompletionEvent.Status.OBSOLETE);
- }
- vertex.successSourceAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
+ LOG.info("Source task attempt completed for vertex: " + vertex.getVertexId()
+ + " attempt: " + completionEvent.getTaskAttemptId()
+ + " with state: " + completionEvent.getTaskAttemptState());
+
+ if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent
+ .getTaskAttemptState())) {
+ vertex.numSuccessSourceAttemptCompletions++;
+ vertex.vertexScheduler.onSourceTaskCompleted(completionEvent
+ .getTaskAttemptId());
}
}
}
- // TODO Why is TA event coming directly to Vertex instead of TA -> Task -> Vertex
private static class TaskAttemptCompletedEventTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
- TezDependentTaskCompletionEvent tce =
- ((VertexEventTaskAttemptCompleted) event).getCompletionEvent();
+ VertexEventTaskAttemptCompleted completionEvent =
+ ((VertexEventTaskAttemptCompleted) event);
- // TODO this should only be sent for successful events? looks like all
- // need to be sent in the existing shuffle code
+ // If different tasks were connected to different destination vertices
+ // then this would need to be sent via the edges
// Notify all target vertices
if (vertex.targetVertices != null) {
for (Vertex targetVertex : vertex.targetVertices.keySet()) {
vertex.eventHandler.handle(
new VertexEventSourceTaskAttemptCompleted(
- targetVertex.getVertexId(), tce)
+ targetVertex.getVertexId(), completionEvent)
);
}
}
}
}
- private static class TaskAttemptFetchFailureTransition implements
- SingleArcTransition<VertexImpl, VertexEvent> {
- @Override
- public void transition(VertexImpl vertex, VertexEvent event) {
- VertexEventTaskAttemptFetchFailure fetchfailureEvent =
- (VertexEventTaskAttemptFetchFailure) event;
- for (TezTaskAttemptID mapId : fetchfailureEvent.getSources()) {
- Integer fetchFailures = vertex.fetchFailuresMapping.get(mapId);
- fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
- vertex.fetchFailuresMapping.put(mapId, fetchFailures);
-
- //get number of running reduces
- int runningReduceTasks = 0;
- for (TezTaskID taskId : vertex.tasks.keySet()) {
- if (TaskState.RUNNING.equals(vertex.tasks.get(taskId).getState())) {
- runningReduceTasks++;
- }
- }
-
- float failureRate = runningReduceTasks == 0 ? 1.0f :
- (float) fetchFailures / runningReduceTasks;
- // declare faulty if fetch-failures >= max-allowed-failures
- boolean isMapFaulty =
- (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
- if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
- LOG.info("Too many fetch-failures for output of task attempt: " +
- mapId + " ... raising fetch failure to source");
- vertex.eventHandler.handle(new TaskAttemptEvent(mapId,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
- vertex.fetchFailuresMapping.remove(mapId);
- }
- }
- }
- }
-
private static class TaskCompletedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -1413,12 +1313,39 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
- //succeeded map task is restarted back
+ //succeeded task is restarted back
vertex.completedTaskCount--;
vertex.succeededTaskCount--;
}
}
+
+ private static class TaskRescheduledAfterVertexSuccessTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ if (vertex.committer instanceof NullVertexOutputCommitter) {
+ LOG.info(vertex.getVertexId() + " back to running due to rescheduling "
+ + ((VertexEventTaskReschedule)event).getTaskID());
+ (new TaskRescheduledTransition()).transition(vertex, event);
+ // inform the DAG that we are re-running
+ vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId()));
+ return VertexState.RUNNING;
+ }
+
+ LOG.info(vertex.getVertexId() + " failed due to post-commit rescheduling of "
+ + ((VertexEventTaskReschedule)event).getTaskID());
+ // terminate any running tasks
+ vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE,
+ TaskTerminationCause.OWN_TASK_FAILURE);
+ // since the DAG thinks this vertex is completed it must be notified of
+ // an error
+ vertex.eventHandler.handle(new DAGEvent(vertex.getDAGId(),
+ DAGEventType.INTERNAL_ERROR));
+ return VertexState.FAILED;
+ }
+ }
+
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java b/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java
deleted file mode 100644
index fd4c1ee..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.records;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * This is used to track task completion events on
- * job tracker.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-// TODO TEZAM3 This needs to be more generic. Maybe some kind of a serialized
-// blob - which can be interpretted by the Input plugin.
-public class TezDependentTaskCompletionEvent implements Writable {
- @InterfaceAudience.Public
- @InterfaceStability.Evolving
- // TODO EVENTUALLY - Remove TIPFAILED state ?
- static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
-
- private int eventId;
- private int taskRunTime; // using int since runtime is the time difference
- private TezTaskAttemptID taskAttemptId;
- private long dataSize;
- Status status;
- byte[] userPayload;
- // TODO TEZAM2 Get rid of the isMap field. Job specific type information can be determined from TaskAttemptId.getTaskType
-// boolean isMap = false;
- public static final TezDependentTaskCompletionEvent[] EMPTY_ARRAY =
- new TezDependentTaskCompletionEvent[0];
-
- public TezDependentTaskCompletionEvent() {
- taskAttemptId = new TezTaskAttemptID();
- }
-
- /**
- * Constructor. eventId should be created externally and incremented
- * per event for each job.
- * @param eventId event id, event id should be unique and assigned in
- * incrementally, starting from 0.
- * @param taskAttemptId task id
- * @param status task's status
- * @param taskTrackerHttp task tracker's host:port for http.
- */
- public TezDependentTaskCompletionEvent(int eventId,
- TezTaskAttemptID taskAttemptId,
-// boolean isMap,
- Status status,
- int runTime,
- long dataSize){
-
- this.taskAttemptId = taskAttemptId;
-// this.isMap = isMap;
- this.eventId = eventId;
- this.status =status;
- this.taskRunTime = runTime;
- this.dataSize = dataSize;
- }
-
- public TezDependentTaskCompletionEvent clone() {
- TezDependentTaskCompletionEvent clone = new TezDependentTaskCompletionEvent(
- this.eventId, this.taskAttemptId, this.status,
- this.taskRunTime, this.dataSize);
-
- return clone;
- }
-
- /**
- * Returns event Id.
- * @return event id
- */
- public int getEventId() {
- return eventId;
- }
-
- /**
- * Returns task id.
- * @return task id
- */
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptId;
- }
-
- /**
- * Returns enum Status.SUCESS or Status.FAILURE.
- * @return task tracker status
- */
- public Status getStatus() {
- return status;
- }
-
- /**
- * Returns time (in millisec) the task took to complete.
- */
- public int getTaskRunTime() {
- return taskRunTime;
- }
-
- /**
- * Return size of output produced by the task
- */
- public long getDataSize() {
- return dataSize;
- }
-
- /**
- * @return user payload. Maybe null
- */
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- /**
- * Set the task completion time
- * @param taskCompletionTime time (in millisec) the task took to complete
- */
- protected void setTaskRunTime(int taskCompletionTime) {
- this.taskRunTime = taskCompletionTime;
- }
-
- /**
- * set event Id. should be assigned incrementally starting from 0.
- * @param eventId
- */
- public void setEventId(int eventId) {
- this.eventId = eventId;
- }
-
- /**
- * Sets task id.
- * @param taskId
- */
- public void setTaskAttemptID(TezTaskAttemptID taskId) {
- this.taskAttemptId = taskId;
- }
-
- /**
- * Set task status.
- * @param status
- */
- public void setTaskStatus(Status status) {
- this.status = status;
- }
-
- /**
- * Set the user payload
- * @param userPayload
- */
- public void setUserPayload(byte[] userPayload) {
- this.userPayload = userPayload;
- }
-
- @Override
- public String toString(){
- StringBuffer buf = new StringBuffer();
- buf.append("Task Id : ");
- buf.append(taskAttemptId);
- buf.append(", Status : ");
- buf.append(status.name());
- return buf.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- // not counting userPayload as that is a piggyback mechanism
- if(o == null)
- return false;
- if(o.getClass().equals(this.getClass())) {
- TezDependentTaskCompletionEvent event = (TezDependentTaskCompletionEvent) o;
- return this.eventId == event.getEventId()
- && this.status.equals(event.getStatus())
- && this.taskAttemptId.equals(event.getTaskAttemptID())
- && this.taskRunTime == event.getTaskRunTime()
- && this.dataSize == event.getDataSize();
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- taskAttemptId.write(out);
-// out.writeBoolean(isMap);
- WritableUtils.writeEnum(out, status);
- WritableUtils.writeVInt(out, taskRunTime);
- WritableUtils.writeVInt(out, eventId);
- WritableUtils.writeCompressedByteArray(out, userPayload);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- taskAttemptId.readFields(in);
-// isMap = in.readBoolean();
- status = WritableUtils.readEnum(in, Status.class);
- taskRunTime = WritableUtils.readVInt(in);
- eventId = WritableUtils.readVInt(in);
- userPayload = WritableUtils.readCompressedByteArray(in);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java b/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java
deleted file mode 100644
index ff4f267..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.records;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-
-public class TezTaskDependencyCompletionEventsUpdate implements Writable {
- TezDependentTaskCompletionEvent[] events;
- boolean reset;
-
- public TezTaskDependencyCompletionEventsUpdate() { }
-
- public TezTaskDependencyCompletionEventsUpdate(
- TezDependentTaskCompletionEvent[] events, boolean reset) {
- this.events = events;
- this.reset = reset;
- }
-
- public boolean shouldReset() {
- return reset;
- }
-
- public TezDependentTaskCompletionEvent[] getDependentTaskCompletionEvents() {
- return events;
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(reset);
- out.writeInt(events.length);
- for (TezDependentTaskCompletionEvent event : events) {
- event.write(out);
- }
- }
-
- public void readFields(DataInput in) throws IOException {
- reset = in.readBoolean();
- events = new TezDependentTaskCompletionEvent[in.readInt()];
- for (int i = 0; i < events.length; ++i) {
- events[i] = new TezDependentTaskCompletionEvent();
- events[i].readFields(in);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index f2717be..434a4b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -71,6 +71,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -82,7 +83,10 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -583,7 +587,7 @@ public class TestTaskAttempt {
@Test
// Verifies that multiple TooManyFetchFailures are handled correctly by the
// TaskAttempt.
- public void testMultipleTooManyFetchFailures() throws Exception {
+ public void testMultipleOutputFailed() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
@@ -641,9 +645,14 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
- taImpl.handle(new TaskAttemptEvent(taskAttemptID,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
- int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 3;
+ InputReadErrorEvent reEvent = new InputReadErrorEvent("", 0, 1);
+ EventMetaData mockMeta = mock(EventMetaData.class);
+ TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+ when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId);
+ TezEvent tzEvent = new TezEvent(reEvent, mockMeta);
+ taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1));
+ int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
+ arg.getAllValues().clear();
verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture());
verifyEventType(
arg.getAllValues().subList(expectedEventsTillSucceeded,
@@ -651,8 +660,7 @@ public class TestTaskAttempt {
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
- taImpl.handle(new TaskAttemptEvent(taskAttemptID,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+ taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 1));
assertEquals("Task attempt is not in FAILED state, still",
taImpl.getState(), TaskAttemptState.FAILED);
assertFalse(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index b524f6a..2cbf1fe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -63,6 +63,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
@@ -82,8 +83,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent.Status;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -977,7 +976,6 @@ public class TestVertexImpl {
TezTaskID t2_v4 = new TezTaskID(v4.getVertexId(), 1);
TezTaskID t1_v5 = new TezTaskID(v5.getVertexId(), 0);
TezTaskID t2_v5 = new TezTaskID(v5.getVertexId(), 1);
- TezTaskID t1_v6 = new TezTaskID(v6.getVertexId(), 0);
TezTaskAttemptID ta1_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
TezTaskAttemptID ta2_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
@@ -985,33 +983,13 @@ public class TestVertexImpl {
TezTaskAttemptID ta1_t1_v5 = new TezTaskAttemptID(t1_v5, 0);
TezTaskAttemptID ta1_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
TezTaskAttemptID ta2_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
- TezTaskAttemptID ta1_t1_v6 = new TezTaskAttemptID(t1_v6, 0);
-
- TezDependentTaskCompletionEvent cEvt1 =
- new TezDependentTaskCompletionEvent(1, ta1_t1_v4,
- Status.FAILED, 3, 0);
- TezDependentTaskCompletionEvent cEvt2 =
- new TezDependentTaskCompletionEvent(2, ta2_t1_v4,
- Status.SUCCEEDED, 4, 1);
- TezDependentTaskCompletionEvent cEvt3 =
- new TezDependentTaskCompletionEvent(2, ta1_t2_v4,
- Status.SUCCEEDED, 5, 2);
- TezDependentTaskCompletionEvent cEvt4 =
- new TezDependentTaskCompletionEvent(2, ta1_t1_v5,
- Status.SUCCEEDED, 5, 3);
- TezDependentTaskCompletionEvent cEvt5 =
- new TezDependentTaskCompletionEvent(1, ta1_t2_v5,
- Status.FAILED, 3, 4);
- TezDependentTaskCompletionEvent cEvt6 =
- new TezDependentTaskCompletionEvent(2, ta2_t2_v5,
- Status.SUCCEEDED, 4, 5);
-
- v4.handle(new VertexEventTaskAttemptCompleted(cEvt1));
- v4.handle(new VertexEventTaskAttemptCompleted(cEvt2));
- v4.handle(new VertexEventTaskAttemptCompleted(cEvt3));
- v5.handle(new VertexEventTaskAttemptCompleted(cEvt4));
- v5.handle(new VertexEventTaskAttemptCompleted(cEvt5));
- v5.handle(new VertexEventTaskAttemptCompleted(cEvt6));
+
+ v4.handle(new VertexEventTaskAttemptCompleted(ta1_t1_v4, TaskAttemptStateInternal.FAILED));
+ v4.handle(new VertexEventTaskAttemptCompleted(ta2_t1_v4, TaskAttemptStateInternal.SUCCEEDED));
+ v4.handle(new VertexEventTaskAttemptCompleted(ta1_t2_v4, TaskAttemptStateInternal.SUCCEEDED));
+ v5.handle(new VertexEventTaskAttemptCompleted(ta1_t1_v5, TaskAttemptStateInternal.SUCCEEDED));
+ v5.handle(new VertexEventTaskAttemptCompleted(ta1_t2_v5, TaskAttemptStateInternal.FAILED));
+ v5.handle(new VertexEventTaskAttemptCompleted(ta2_t2_v5, TaskAttemptStateInternal.SUCCEEDED));
v4.handle(new VertexEventTaskCompleted(t1_v4, TaskState.SUCCEEDED));
v4.handle(new VertexEventTaskCompleted(t2_v4, TaskState.SUCCEEDED));
@@ -1023,9 +1001,7 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v5.getState());
Assert.assertEquals(VertexState.RUNNING, v6.getState());
- Assert.assertEquals(4, v6.successSourceAttemptCompletionEventNoMap.size());
- Assert.assertEquals(6,
- v6.getTaskAttemptCompletionEvents(ta1_t1_v6, 0, 100).length);
+ Assert.assertEquals(4, v6.numSuccessSourceAttemptCompletions);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 81715bd..b2e13e2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -41,8 +41,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -53,6 +53,7 @@ public class TestVertexScheduler {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout = 5000)
+ @Ignore // TODO TEZ-481
public void testShuffleVertexManagerAutoParallelism() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(
@@ -98,9 +99,6 @@ public class TestVertexScheduler {
when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
- TezDependentTaskCompletionEvent mockEvent =
- mock(TezDependentTaskCompletionEvent.class);
-
mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
@@ -165,12 +163,12 @@ public class TestVertexScheduler {
new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
// parallelism not change due to large data size
- when(mockEvent.getDataSize()).thenReturn(5000L);
+ //when(mockEvent.getDataSize()).thenReturn(5000L);
scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
scheduler.onVertexStarted();
Assert.assertTrue(scheduler.pendingTasks.size() == 4); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
// managedVertex tasks reduced
verify(mockManagedVertex, times(0)).setParallelism(anyInt(), anyMap());
Assert.assertEquals(0, scheduler.pendingTasks.size()); // all tasks scheduled
@@ -179,7 +177,7 @@ public class TestVertexScheduler {
Assert.assertEquals(5000L, scheduler.completedSourceTasksOutputSize);
// parallelism changed due to small data size
- when(mockEvent.getDataSize()).thenReturn(500L);
+ //when(mockEvent.getDataSize()).thenReturn(500L);
scheduledTasks.clear();
Configuration procConf = new Configuration();
ProcessorDescriptor procDesc = new ProcessorDescriptor("REDUCE");
@@ -191,23 +189,23 @@ public class TestVertexScheduler {
Assert.assertEquals(4, scheduler.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, scheduler.numSourceTasks);
// task completion from non-bipartite stage does nothing
- scheduler.onSourceTaskCompleted(mockSrcAttemptId31, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
Assert.assertEquals(4, scheduler.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, scheduler.numSourceTasks);
Assert.assertEquals(0, scheduler.numSourceTasksCompleted);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
Assert.assertEquals(4, scheduler.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
// ignore duplicate completion
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
Assert.assertEquals(4, scheduler.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId12, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
// managedVertex tasks reduced
verify(mockManagedVertex).setParallelism(eq(2), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
@@ -220,7 +218,7 @@ public class TestVertexScheduler {
Assert.assertEquals(1000L, scheduler.completedSourceTasksOutputSize);
// more completions dont cause recalculation of parallelism
- scheduler.onSourceTaskCompleted(mockSrcAttemptId21, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
verify(mockManagedVertex).setParallelism(eq(2), anyMap());
}
@@ -266,9 +264,6 @@ public class TestVertexScheduler {
when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
- TezDependentTaskCompletionEvent mockEvent =
- mock(TezDependentTaskCompletionEvent.class);
-
// fail if there is no bipartite src vertex
mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
try {
@@ -362,11 +357,11 @@ public class TestVertexScheduler {
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
// task completion from non-bipartite stage does nothing
- scheduler.onSourceTaskCompleted(mockSrcAttemptId31, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
Assert.assertTrue(scheduler.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
@@ -377,20 +372,20 @@ public class TestVertexScheduler {
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
// task completion from non-bipartite stage does nothing
- scheduler.onSourceTaskCompleted(mockSrcAttemptId31, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
Assert.assertTrue(scheduler.pendingTasks.size() == 3);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId12, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
Assert.assertTrue(scheduler.pendingTasks.size() == 3);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId21, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
Assert.assertTrue(scheduler.pendingTasks.size() == 3);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId22, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
Assert.assertTrue(scheduler.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
@@ -401,20 +396,20 @@ public class TestVertexScheduler {
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
// task completion from non-bipartite stage does nothing
- scheduler.onSourceTaskCompleted(mockSrcAttemptId31, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
Assert.assertTrue(scheduler.pendingTasks.size() == 3);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId12, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
Assert.assertTrue(scheduler.pendingTasks.size() == 3);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId21, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
Assert.assertTrue(scheduler.pendingTasks.size() == 3);
Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId22, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
Assert.assertTrue(scheduler.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
@@ -424,22 +419,22 @@ public class TestVertexScheduler {
scheduler.onVertexStarted();
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId12, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
Assert.assertTrue(scheduler.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
// completion of same task again should not get counted
- scheduler.onSourceTaskCompleted(mockSrcAttemptId12, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
Assert.assertTrue(scheduler.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId21, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
Assert.assertTrue(scheduler.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
scheduledTasks.clear();
- scheduler.onSourceTaskCompleted(mockSrcAttemptId22, mockEvent); // we are done. no action
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId22); // we are done. no action
Assert.assertTrue(scheduler.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
@@ -449,16 +444,16 @@ public class TestVertexScheduler {
scheduler.onVertexStarted();
Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduler.numSourceTasks == 4);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId12, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
Assert.assertTrue(scheduler.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId21, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
Assert.assertTrue(scheduler.pendingTasks.size() == 1);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
- scheduler.onSourceTaskCompleted(mockSrcAttemptId22, mockEvent);
+ scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
Assert.assertTrue(scheduler.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
[2/2] git commit: TEZ-431. Implement fault tolerance,
retries and event flow for dealing with failed inputs (bikas)
Posted by bi...@apache.org.
TEZ-431. Implement fault tolerance, retries and event flow for dealing with failed inputs (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3749a18f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3749a18f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3749a18f
Branch: refs/heads/TEZ-398
Commit: 3749a18fafad2f0ebe661ff8979d8c6f794e295f
Parents: b212ca1
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Sep 24 18:42:03 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 24 18:42:03 2013 -0700
----------------------------------------------------------------------
.../tez/dag/app/dag/DAGTerminationCause.java | 4 +-
.../org/apache/tez/dag/app/dag/EdgeManager.java | 5 +-
.../tez/dag/app/dag/TaskTerminationCause.java | 7 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 5 -
.../apache/tez/dag/app/dag/VertexScheduler.java | 4 +-
.../tez/dag/app/dag/VertexTerminationCause.java | 4 +-
.../tez/dag/app/dag/event/DAGEventType.java | 1 +
.../app/dag/event/DAGEventVertexReRunning.java | 37 +++
.../dag/event/TaskAttemptEventNodeFailed.java | 4 +-
.../dag/event/TaskAttemptEventOutputFailed.java | 44 ++++
.../dag/event/TaskAttemptEventStatusUpdate.java | 3 +-
.../dag/app/dag/event/TaskAttemptEventType.java | 5 +-
.../VertexEventSourceTaskAttemptCompleted.java | 7 +-
.../event/VertexEventTaskAttemptCompleted.java | 25 +-
.../VertexEventTaskAttemptFetchFailure.java | 46 ----
.../tez/dag/app/dag/event/VertexEventType.java | 5 +-
.../dag/app/dag/impl/BroadcastEdgeManager.java | 15 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 93 +++++---
.../org/apache/tez/dag/app/dag/impl/Edge.java | 42 ++--
.../dag/impl/ImmediateStartVertexScheduler.java | 4 +-
.../dag/app/dag/impl/OneToOneEdgeManager.java | 12 +-
.../app/dag/impl/ScatterGatherEdgeManager.java | 29 +--
.../dag/app/dag/impl/ShuffleVertexManager.java | 29 ++-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 109 ++++++---
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 78 +++----
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 201 ++++++----------
.../TezDependentTaskCompletionEvent.java | 228 -------------------
...TezTaskDependencyCompletionEventsUpdate.java | 64 ------
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 20 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 42 +---
.../dag/app/dag/impl/TestVertexScheduler.java | 67 +++---
31 files changed, 485 insertions(+), 754 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index 05f15f3..3b097eb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -34,5 +34,7 @@ public enum DAGTerminationCause {
ZERO_VERTICES,
/** DAG failed during init. */
- INIT_FAILURE,
+ INIT_FAILURE,
+
+ INTERNAL_ERROR
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
index 674d18e..86d155f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
@@ -26,10 +26,10 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
public abstract class EdgeManager {
- public abstract int getNumDestinationTaskInputs(Vertex sourceVertex,
+ public abstract int getNumDestinationTaskInputs(int numSourceTasks,
int destinationTaskIndex);
- public abstract int getNumSourceTaskOutputs(Vertex destinationVertex,
+ public abstract int getNumSourceTaskOutputs(int numDestinationTasks,
int sourceTaskIndex);
/**
@@ -41,6 +41,7 @@ public abstract class EdgeManager {
public abstract void routeEventToDestinationTasks(InputFailedEvent event,
int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+ public abstract int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestTasks);
/**
* Return the source task index to which to send the event
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
index 73741f0..6736d2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
@@ -18,8 +18,6 @@
package org.apache.tez.dag.app.dag;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
-
/**
* Represents proximate cause of a Task transition to FAILED or KILLED.
*/
@@ -31,6 +29,9 @@ public enum TaskTerminationCause {
/** Other vertex failed causing DAG to fail thus killing the parent vertex */
OTHER_VERTEX_FAILURE,
- /** One of the tasks for the parent vertex failed. */
+ /** One of the tasks for the source/destination vertex failed. */
OTHER_TASK_FAILURE,
+
+ /** One of the tasks of the destination vertex failed. */
+ OWN_TASK_FAILURE
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 76964a3..adaa27f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -29,12 +29,10 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.app.dag.impl.Edge;
-import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
/**
@@ -68,9 +66,6 @@ public interface Vertex extends Comparable<Vertex> {
void setParallelism(int parallelism,Map<Vertex, EdgeManager> sourceEdgeManagers);
- TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
- TezTaskAttemptID attemptId, int fromEventId, int maxEvents);
-
// CHANGE THESE TO LISTS AND MAINTAIN ORDER?
void setInputVertices(Map<Vertex, Edge> inVertices);
void setOutputVertices(Map<Vertex, Edge> outVertices);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
index 4a1a7a6..3789702 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
@@ -20,12 +20,10 @@ package org.apache.tez.dag.app.dag;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
// Rename to VertexManager TEZ-364 and move to DAG API. Make abstract class.
public interface VertexScheduler {
void initialize(Configuration conf);
void onVertexStarted();
- void onSourceTaskCompleted(TezTaskAttemptID attemptId,
- TezDependentTaskCompletionEvent event);
+ void onSourceTaskCompleted(TezTaskAttemptID attemptId);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 138ee70..f675ace 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -39,5 +39,7 @@ public enum VertexTerminationCause {
ZERO_TASKS,
/** This vertex failed during init. */
- INIT_FAILURE
+ INIT_FAILURE,
+
+ INTERNAL_ERROR
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 14c2f30..476c688 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -32,6 +32,7 @@ public enum DAGEventType {
//Producer: Vertex
DAG_VERTEX_COMPLETED,
+ DAG_VERTEX_RERUNNING,
//Producer: TaskImpl
DAG_SCHEDULER_UPDATE,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexReRunning.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexReRunning.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexReRunning.java
new file mode 100644
index 0000000..303d48d
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexReRunning.java
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezVertexID;
+
+public class DAGEventVertexReRunning extends DAGEvent {
+
+ private TezVertexID vertexId;
+
+ public DAGEventVertexReRunning(TezVertexID vertexId) {
+ super(vertexId.getDAGId(), DAGEventType.DAG_VERTEX_RERUNNING);
+ this.vertexId = vertexId;
+ }
+
+ public TezVertexID getVertexId() {
+ return vertexId;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
index ee143bb..6d97466 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
@@ -19,7 +19,8 @@ package org.apache.tez.dag.app.dag.event;
import org.apache.tez.dag.records.TezTaskAttemptID;
-public class TaskAttemptEventNodeFailed extends TaskAttemptEvent {
+public class TaskAttemptEventNodeFailed extends TaskAttemptEvent
+ implements DiagnosableEvent{
private final String message;
@@ -29,6 +30,7 @@ public class TaskAttemptEventNodeFailed extends TaskAttemptEvent {
this.message = diagMessage;
}
+ @Override
public String getDiagnosticInfo() {
return this.message;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
new file mode 100644
index 0000000..678e1e7
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
+
+ private TezEvent inputFailedEvent;
+ private int consumerTaskNumber;
+
+ public TaskAttemptEventOutputFailed(TezTaskAttemptID attemptId,
+ TezEvent tezEvent, int numConsumers) {
+ super(attemptId, TaskAttemptEventType.TA_OUTPUT_FAILED);
+ this.inputFailedEvent = tezEvent;
+ this.consumerTaskNumber = numConsumers;
+ }
+
+ public TezEvent getInputFailedEvent() {
+ return inputFailedEvent;
+ }
+
+ public int getConsumerTaskNumber() {
+ return consumerTaskNumber;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index 30aefde..13577c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -31,7 +31,8 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
private TaskStatusUpdateEvent taskAttemptStatus;
- public TaskAttemptEventStatusUpdate(TezTaskAttemptID id, TaskStatusUpdateEvent statusEvent) {
+ public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
+ TaskStatusUpdateEvent statusEvent) {
super(id, TaskAttemptEventType.TA_STATUS_UPDATE);
this.taskAttemptStatus = statusEvent;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 16e4e3f..5210e33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -55,6 +55,7 @@ public enum TaskAttemptEventType {
// The node running the task attempt failed.
TA_NODE_FAILED,
-//Producer: Job
- TA_TOO_MANY_FETCH_FAILURES,
+ // Producer: consumer destination vertex
+ TA_OUTPUT_FAILED,
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceTaskAttemptCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceTaskAttemptCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceTaskAttemptCompleted.java
index 388beba..6cd38a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceTaskAttemptCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceTaskAttemptCompleted.java
@@ -19,21 +19,20 @@
package org.apache.tez.dag.app.dag.event;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
public class VertexEventSourceTaskAttemptCompleted extends VertexEvent {
- private TezDependentTaskCompletionEvent completionEvent;
+ private VertexEventTaskAttemptCompleted completionEvent;
public VertexEventSourceTaskAttemptCompleted(
TezVertexID targetVertexId,
- TezDependentTaskCompletionEvent completionEvent) {
+ VertexEventTaskAttemptCompleted completionEvent) {
super(targetVertexId,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED);
this.completionEvent = completionEvent;
}
- public TezDependentTaskCompletionEvent getCompletionEvent() {
+ public VertexEventTaskAttemptCompleted getCompletionEvent() {
return completionEvent;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java
index c9d3f7d..5b07674 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java
@@ -18,20 +18,27 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.records.TezTaskAttemptID;
public class VertexEventTaskAttemptCompleted extends VertexEvent {
- private TezDependentTaskCompletionEvent completionEvent;
-
- public VertexEventTaskAttemptCompleted(
- TezDependentTaskCompletionEvent completionEvent) {
- super(completionEvent.getTaskAttemptID().getTaskID().getVertexID(),
+ private TezTaskAttemptID attemptId;
+ private TaskAttemptStateInternal attempState;
+
+ public VertexEventTaskAttemptCompleted(TezTaskAttemptID taskAttemptId,
+ TaskAttemptStateInternal state) {
+ super(taskAttemptId.getTaskID().getVertexID(),
VertexEventType.V_TASK_ATTEMPT_COMPLETED);
- this.completionEvent = completionEvent;
+ this.attemptId = taskAttemptId;
+ this.attempState = state;
}
- public TezDependentTaskCompletionEvent getCompletionEvent() {
- return completionEvent;
+ public TezTaskAttemptID getTaskAttemptId() {
+ return attemptId;
+ }
+
+ public TaskAttemptStateInternal getTaskAttemptState() {
+ return attempState;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptFetchFailure.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptFetchFailure.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptFetchFailure.java
deleted file mode 100644
index 5b2b955..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptFetchFailure.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tez.dag.app.dag.event;
-
-import java.util.List;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class VertexEventTaskAttemptFetchFailure extends VertexEvent {
-
- private final TezTaskAttemptID target;
- private final List<TezTaskAttemptID> sources;
-
- public VertexEventTaskAttemptFetchFailure(TezTaskAttemptID reduce,
- List<TezTaskAttemptID> maps) {
- super(
- reduce.getTaskID().getVertexID(),
- VertexEventType.V_TASK_ATTEMPT_FETCH_FAILURE);
- this.target = reduce;
- this.sources = maps;
- }
-
- public List<TezTaskAttemptID> getSources() {
- return sources;
- }
-
- public TezTaskAttemptID getTarget() {
- return target;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index dc7e2dd..7d640af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -49,12 +49,9 @@ public enum VertexEventType {
V_TASK_SUCCEEDED,
V_ATTEMPT_KILLED,
- //Producer:TaskAttemptListener
- V_TASK_ATTEMPT_FETCH_FAILURE,
-
//Producer:Any component
V_DIAGNOSTIC_UPDATE,
- INTERNAL_ERROR,
+ V_INTERNAL_ERROR,
V_COUNTER_UPDATE,
V_ROUTE_EVENT,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 55a2c86..21562e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -21,21 +21,20 @@ package org.apache.tez.dag.app.dag.impl;
import java.util.List;
import org.apache.tez.dag.app.dag.EdgeManager;
-import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
public class BroadcastEdgeManager extends EdgeManager {
@Override
- public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ public int getNumDestinationTaskInputs(int numSourceTasks,
int destinationTaskIndex) {
- return sourceVertex.getTotalTasks();
+ return numSourceTasks;
}
@Override
- public int getNumSourceTaskOutputs(Vertex destinationVertex,
+ public int getNumSourceTaskOutputs(int numDestinationTasks,
int sourceTaskIndex) {
return 1;
}
@@ -66,4 +65,10 @@ public class BroadcastEdgeManager extends EdgeManager {
}
}
+ @Override
+ public int getDestinationConsumerTaskNumber(int sourceTaskIndex,
+ int numDestTasks) {
+ return numDestTasks;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index ee12221..fbf5e9d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -79,6 +79,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
@@ -196,6 +197,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.TERMINATING,DAGState.FAILED),
DAGEventType.DAG_VERTEX_COMPLETED,
new VertexCompletedTransition())
+ .addTransition(DAGState.RUNNING, DAGState.RUNNING,
+ DAGEventType.DAG_VERTEX_RERUNNING,
+ new VertexReRunningTransition())
.addTransition(DAGState.RUNNING, DAGState.TERMINATING,
DAGEventType.DAG_KILL, new DAGKilledTransition())
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
@@ -230,6 +234,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// Ignore-able events
.addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
EnumSet.of(DAGEventType.DAG_KILL,
+ DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE))
// Transitions from SUCCEEDED state
@@ -260,6 +265,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// Ignore-able events
.addTransition(DAGState.FAILED, DAGState.FAILED,
EnumSet.of(DAGEventType.DAG_KILL,
+ DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_VERTEX_COMPLETED))
// Transitions from KILLED state
@@ -276,6 +282,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
.addTransition(DAGState.KILLED, DAGState.KILLED,
EnumSet.of(DAGEventType.DAG_KILL,
DAGEventType.DAG_START,
+ DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE,
DAGEventType.DAG_VERTEX_COMPLETED))
@@ -1117,27 +1124,25 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
job.numCompletedVertices++;
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
- vertexSucceeded(job, vertex);
+ job.vertexSucceeded(vertex);
job.dagScheduler.vertexCompleted(vertex);
}
else if (vertexEvent.getVertexState() == VertexState.FAILED) {
job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
- vertexFailed(job, vertex);
+ job.vertexFailed(vertex);
forceTransitionToKillWait = true;
}
else if (vertexEvent.getVertexState() == VertexState.KILLED) {
- vertexKilled(job, vertex);
+ job.vertexKilled(vertex);
forceTransitionToKillWait = true;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex completed."
- + ", numCompletedVertices=" + job.numCompletedVertices
- + ", numSuccessfulVertices=" + job.numSuccessfulVertices
- + ", numFailedVertices=" + job.numFailedVertices
- + ", numKilledVertices=" + job.numKilledVertices
- + ", numVertices=" + job.numVertices);
- }
+ LOG.info("Vertex " + vertex.getVertexId() + " completed."
+ + ", numCompletedVertices=" + job.numCompletedVertices
+ + ", numSuccessfulVertices=" + job.numSuccessfulVertices
+ + ", numFailedVertices=" + job.numFailedVertices
+ + ", numKilledVertices=" + job.numKilledVertices
+ + ", numVertices=" + job.numVertices);
// if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT.
DAGState state = checkJobForCompletion(job);
@@ -1149,34 +1154,58 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
- private void vertexSucceeded(DAGImpl job, Vertex vertex) {
- job.numSuccessfulVertices++;
- // TODO: Metrics
- //job.metrics.completedTask(task);
+ }
+
+ private static class VertexReRunningTransition implements
+ SingleArcTransition<DAGImpl, DAGEvent> {
+ @Override
+ public void transition(DAGImpl job, DAGEvent event) {
+ DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
+ Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
+ job.numCompletedVertices--;
+ job.vertexReRunning(vertex);
+
+
+ LOG.info("Vertex " + vertex.getVertexId() + " re-running."
+ + ", numCompletedVertices=" + job.numCompletedVertices
+ + ", numSuccessfulVertices=" + job.numSuccessfulVertices
+ + ", numFailedVertices=" + job.numFailedVertices
+ + ", numKilledVertices=" + job.numKilledVertices
+ + ", numVertices=" + job.numVertices);
}
+ }
+
+ private void vertexSucceeded(Vertex vertex) {
+ numSuccessfulVertices++;
+ // TODO: Metrics
+ //job.metrics.completedTask(task);
+ }
+
+ private void vertexReRunning(Vertex vertex) {
+ numSuccessfulVertices--;
+ addDiagnostic("Vertex re-running " + vertex.getVertexId());
+ // TODO: Metrics
+ //job.metrics.completedTask(task);
+ }
- private void vertexFailed(DAGImpl job, Vertex vertex) {
- job.numFailedVertices++;
- job.addDiagnostic("Vertex failed " + vertex.getVertexId());
- // TODO: Metrics
- //job.metrics.failedTask(task);
- }
+ private void vertexFailed(Vertex vertex) {
+ numFailedVertices++;
+ addDiagnostic("Vertex failed " + vertex.getVertexId());
+ // TODO: Metrics
+ //job.metrics.failedTask(task);
+ }
- private void vertexKilled(DAGImpl job, Vertex vertex) {
- job.numKilledVertices++;
- job.addDiagnostic("Vertex killed " + vertex.getVertexId());
- // TODO: Metrics
- //job.metrics.killedTask(task);
- }
+ private void vertexKilled(Vertex vertex) {
+ numKilledVertices++;
+ addDiagnostic("Vertex killed " + vertex.getVertexId());
+ // TODO: Metrics
+ //job.metrics.killedTask(task);
}
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}
-
-
-
private static class DiagnosticsUpdateTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
@@ -1228,6 +1257,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@Override
public void transition(DAGImpl job, DAGEvent event) {
//TODO Is this JH event required.
+ LOG.info(job.getID() + " terminating due to internal error");
+ // terminate all vertices
+ job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
+ VertexTerminationCause.INTERNAL_ERROR);
job.setFinishTime();
job.logJobHistoryUnsuccesfulEvent(DAGStatus.State.FAILED);
job.finished(DAGState.ERROR);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index aaca662..bcdb4af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -28,6 +28,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -97,23 +98,25 @@ public class Edge {
}
public void setDestinationVertex(Vertex destinationVertex) {
- if (this.destinationVertex != null && this.destinationVertex != destinationVertex) {
+ if (this.destinationVertex != null
+ && this.destinationVertex != destinationVertex) {
throw new TezUncheckedException("Destination vertex exists: "
+ destinationVertex.getName());
}
this.destinationVertex = destinationVertex;
}
-
+
public InputSpec getDestinationSpec(int destinationTaskIndex) {
return new InputSpec(sourceVertex.getName(),
edgeProperty.getEdgeDestination(),
- edgeManager.getNumDestinationTaskInputs(sourceVertex, destinationTaskIndex));
- }
-
+ edgeManager.getNumDestinationTaskInputs(sourceVertex.getTotalTasks(),
+ destinationTaskIndex));
+ }
+
public OutputSpec getSourceSpec(int sourceTaskIndex) {
return new OutputSpec(destinationVertex.getName(),
- edgeProperty.getEdgeSource(),
- edgeManager.getNumSourceTaskOutputs(destinationVertex, sourceTaskIndex));
+ edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskOutputs(
+ destinationVertex.getTotalTasks(), sourceTaskIndex));
}
public void startEventBuffering() {
@@ -133,17 +136,25 @@ public class Edge {
sourceEventBuffer.clear();
}
+ @SuppressWarnings("unchecked")
public void sendTezEventToSourceTasks(TezEvent tezEvent) {
if (!bufferEvents.get()) {
switch (tezEvent.getEventType()) {
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent event = (InputReadErrorEvent) tezEvent.getEvent();
- TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+ TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo()
+ .getTaskAttemptID();
int destTaskIndex = destAttemptId.getTaskID().getId();
- int srcTaskIndex = edgeManager.routeEventToSourceTasks(destTaskIndex, event);
- // TODO this is BROKEN. TEZ-431
-// TezTaskID srcTaskId = sourceVertex.getTask(srcTaskIndex).getTaskId();
-// sendEventToTask(srcTaskId, tezEvent);
+ int srcTaskIndex = edgeManager.routeEventToSourceTasks(destTaskIndex,
+ event);
+ int numConsumers = edgeManager.getDestinationConsumerTaskNumber(
+ srcTaskIndex, destinationVertex.getTotalTasks());
+ TezTaskID srcTaskId = sourceVertex.getTask(srcTaskIndex).getTaskId();
+ int taskAttemptIndex = event.getVersion();
+ TezTaskAttemptID srcTaskAttemptId = new TezTaskAttemptID(srcTaskId,
+ taskAttemptIndex);
+ eventHandler.handle(new TaskAttemptEventOutputFailed(srcTaskAttemptId,
+ tezEvent, numConsumers));
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
@@ -201,13 +212,6 @@ public class Edge {
}
}
- private void sendEventToDestination(List<Integer> destTaskIndeces, TezEvent tezEvent) {
- for(Integer destTaskIndex : destTaskIndeces) {
- TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
- sendEventToTask(destTaskId, tezEvent);
- }
- }
-
@SuppressWarnings("unchecked")
private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
eventHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
index a4e5f3b..b79a426 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexScheduler;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
/**
* Starts all tasks immediately on vertex start
@@ -40,8 +39,7 @@ public class ImmediateStartVertexScheduler implements VertexScheduler {
}
@Override
- public void onSourceTaskCompleted(TezTaskAttemptID attemptId,
- TezDependentTaskCompletionEvent event) {
+ public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 29abfac..1ec9451 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -21,21 +21,20 @@ package org.apache.tez.dag.app.dag.impl;
import java.util.List;
import org.apache.tez.dag.app.dag.EdgeManager;
-import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
public class OneToOneEdgeManager extends EdgeManager {
@Override
- public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ public int getNumDestinationTaskInputs(int numDestinationTasks,
int destinationTaskIndex) {
return 1;
}
@Override
- public int getNumSourceTaskOutputs(Vertex destinationVertex,
+ public int getNumSourceTaskOutputs(int numDestinationTasks,
int sourceTaskIndex) {
return 1;
}
@@ -63,4 +62,9 @@ public class OneToOneEdgeManager extends EdgeManager {
void addDestinationTaskIndex(int sourceTaskIndex, List<Integer> taskIndeces) {
taskIndeces.add(new Integer(sourceTaskIndex));
}
+
+ @Override
+ public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestTasks) {
+ return 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index 3d1d289..b1dd475 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -21,35 +21,21 @@ package org.apache.tez.dag.app.dag.impl;
import java.util.List;
import org.apache.tez.dag.app.dag.EdgeManager;
-import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
public class ScatterGatherEdgeManager extends EdgeManager {
- private int initialDestinationTaskNumber = -1;
-
@Override
- public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ public int getNumDestinationTaskInputs(int numSourceTasks,
int destinationTaskIndex) {
- return sourceVertex.getTotalTasks();
+ return numSourceTasks;
}
@Override
- public int getNumSourceTaskOutputs(Vertex destinationVertex,
- int sourceTaskIndex) {
- if(initialDestinationTaskNumber == -1) {
- // the downstream vertex may not have started and so its number of tasks
- // may change. So save this initial count and provide a consistent view
- // to all source tasks, including late starters and retries.
- // When the number of destination tasks change then the routing will have
- // to be updated too.
- // This value may be obtained from config too if destination task initial
- // parallelism is not specified.
- initialDestinationTaskNumber = destinationVertex.getTotalTasks();
- }
- return initialDestinationTaskNumber;
+ public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+ return numDestinationTasks;
}
@Override
@@ -73,5 +59,10 @@ public class ScatterGatherEdgeManager extends EdgeManager {
InputReadErrorEvent event) {
return event.getIndex();
}
+
+ @Override
+ public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestTasks) {
+ return numDestTasks;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index a4dd555..e039c72 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -43,7 +43,6 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
/**
* Starts scheduling tasks when number of completed source tasks crosses
@@ -91,13 +90,13 @@ public class ShuffleVertexManager implements VertexScheduler {
}
- public class CustomEdgeManager extends EdgeManager {
+ public class CustomShuffleEdgeManager extends EdgeManager {
int numSourceTaskOutputs;
int numDestinationTasks;
int basePartitionRange;
int remainderRangeForLastShuffler;
- CustomEdgeManager(int numSourceTaskOutputs, int numDestinationTasks,
+ CustomShuffleEdgeManager(int numSourceTaskOutputs, int numDestinationTasks,
int basePartitionRange, int remainderPartitionForLastShuffler) {
this.numSourceTaskOutputs = numSourceTaskOutputs;
this.numDestinationTasks = numDestinationTasks;
@@ -106,7 +105,7 @@ public class ShuffleVertexManager implements VertexScheduler {
}
@Override
- public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ public int getNumDestinationTaskInputs(int numSourceTasks,
int destinationTaskIndex) {
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
@@ -114,11 +113,11 @@ public class ShuffleVertexManager implements VertexScheduler {
} else {
partitionRange = remainderRangeForLastShuffler;
}
- return sourceVertex.getTotalTasks() * partitionRange;
+ return numSourceTasks * partitionRange;
}
@Override
- public int getNumSourceTaskOutputs(Vertex destinationVertex,
+ public int getNumSourceTaskOutputs(int numDestinationTasks,
int sourceTaskIndex) {
return numSourceTaskOutputs;
}
@@ -163,6 +162,12 @@ public class ShuffleVertexManager implements VertexScheduler {
}
return event.getIndex()/partitionRange;
}
+
+ @Override
+ public int getDestinationConsumerTaskNumber(int sourceTaskIndex,
+ int numDestTasks) {
+ return numDestTasks;
+ }
}
@@ -182,8 +187,7 @@ public class ShuffleVertexManager implements VertexScheduler {
}
@Override
- public void onSourceTaskCompleted(TezTaskAttemptID srcAttemptId,
- TezDependentTaskCompletionEvent event) {
+ public void onSourceTaskCompleted(TezTaskAttemptID srcAttemptId) {
updateSourceTaskCount();
TezTaskID srcTaskId = srcAttemptId.getTaskID();
TezVertexID srcVertexId = srcTaskId.getVertexID();
@@ -194,9 +198,10 @@ public class ShuffleVertexManager implements VertexScheduler {
++numSourceTasksCompleted;
if (enableAutoParallelism) {
// save output size
- long sourceTaskOutputSize = event.getDataSize();
+ // TODO TEZ-481
+ long sourceTaskOutputSize = 100000000l;//sourceTaskAttempt.getDataSize();
if (LOG.isDebugEnabled()) {
- LOG.debug("Source task: " + event.getTaskAttemptID()
+ LOG.debug("Source task: " + srcAttemptId
+ " finished with output size: " + sourceTaskOutputSize);
}
completedSourceTasksOutputSize += sourceTaskOutputSize;
@@ -282,7 +287,9 @@ public class ShuffleVertexManager implements VertexScheduler {
Map<Vertex, EdgeManager> edgeManagers = new HashMap<Vertex, EdgeManager>(
bipartiteSources.size());
for(Vertex vertex : bipartiteSources.values()) {
- edgeManagers.put(vertex, new CustomEdgeManager(currentParallelism,
+ // use currentParallelism for numSourceTasks to maintain original state
+ // for the source tasks
+ edgeManagers.put(vertex, new CustomShuffleEdgeManager(currentParallelism,
finalTaskParallelism, basePartitionRange,
remainderRangeForLastShuffler));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index b79f856..bfd14e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -70,9 +71,8 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -89,8 +89,10 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -132,6 +134,9 @@ public class TaskAttemptImpl implements TaskAttempt,
// Used to store locality information when
Set<String> taskHosts = new HashSet<String>();
Set<String> taskRacks = new HashSet<String>();
+
+ private Set<String> uniquefailedOutputReports = new HashSet<String>();
+ private static final double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = 0.5;
protected final TaskLocationHint locationHint;
protected final boolean isRescheduled;
@@ -184,7 +189,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -198,28 +203,28 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.OUTPUT_CONSUMABLE), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
// How will duplicate history events be handled ?
// TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedAfterSuccessTransition(FAILED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
@@ -551,7 +556,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// always called in write lock
private void setFinishTime() {
// set the finish time only if launch time is set
- if (launchTime != 0) {
+ if (launchTime != 0 && finishTime != 0) {
finishTime = clock.getTime();
}
}
@@ -967,13 +972,19 @@ public class TaskAttemptImpl implements TaskAttempt,
TerminatedTransitionHelper helper) {
super(helper);
}
+
+ protected boolean sendSchedulerEvent() {
+ return true;
+ }
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
// Inform the scheduler
- ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
- .getTaskAttemptState()));
+ if (sendSchedulerEvent()) {
+ ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
+ .getTaskAttemptState()));
+ }
// Decrement speculator container request.
//ta.maybeSendSpeculatorContainerNoLongerRequired();
}
@@ -989,8 +1000,6 @@ public class TaskAttemptImpl implements TaskAttempt,
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
- TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
- ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
}
}
@@ -1004,8 +1013,6 @@ public class TaskAttemptImpl implements TaskAttempt,
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
- TaskAttemptEventContainerTerminating tEvent = (TaskAttemptEventContainerTerminating) event;
- ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
@@ -1019,9 +1026,6 @@ public class TaskAttemptImpl implements TaskAttempt,
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
-
- TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
- ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
@@ -1041,15 +1045,6 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.updateProgressSplits();
- // TODO TEZ-431
-// // Inform the job about fetch failures if they exist.
-// if (ta.reportedStatus.fetchFailedMaps != null
-// && ta.reportedStatus.fetchFailedMaps.size() > 0) {
-// ta.sendEvent(new VertexEventTaskAttemptFetchFailure(ta.attemptId,
-// ta.reportedStatus.fetchFailedMaps));
-// }
- // TODO at some point. Nodes may be interested in FetchFailure info.
- // Can be used to blacklist nodes.
}
}
@@ -1119,8 +1114,6 @@ public class TaskAttemptImpl implements TaskAttempt,
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
- TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
- ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
@@ -1139,6 +1132,13 @@ public class TaskAttemptImpl implements TaskAttempt,
protected static class TerminatedAfterSuccessTransition extends
TerminatedBeforeRunningTransition {
+ @Override
+ protected boolean sendSchedulerEvent() {
+ // since the success transition would have sent the event
+ // there is no need to send it again
+ return false;
+ }
+
public TerminatedAfterSuccessTransition(TerminatedTransitionHelper helper) {
super(helper);
}
@@ -1150,6 +1150,55 @@ public class TaskAttemptImpl implements TaskAttempt,
}
}
+
+ protected static class OutputReportedFailedTransition implements
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+ @Override
+ public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+ TaskAttemptEvent event) {
+ TaskAttemptEventOutputFailed outputFailedEvent =
+ (TaskAttemptEventOutputFailed) event;
+ TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
+ TezTaskAttemptID failedDestTaId = tezEvent.getSourceInfo().getTaskAttemptID();
+ InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)tezEvent.getEvent();
+ int failedInputIndexOnDestTa = readErrorEvent.getIndex();
+ if (readErrorEvent.getVersion() != attempt.getID().getId()) {
+ throw new TezUncheckedException(attempt.getID()
+ + " incorrectly blamed for read error from " + failedDestTaId
+ + " at inputIndex " + failedInputIndexOnDestTa + " version"
+ + readErrorEvent.getVersion());
+ }
+ LOG.info(attempt.getID()
+ + " blamed for read error from " + failedDestTaId
+ + " at inputIndex " + failedInputIndexOnDestTa);
+ String failedReportId = failedDestTaId + "_" + failedInputIndexOnDestTa;
+ attempt.uniquefailedOutputReports.add(failedReportId);
+ float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
+ / outputFailedEvent.getConsumerTaskNumber();
+
+ // If needed we can also use the absolute number of reported output errors
+ // If needed we can launch a background task without failing this task
+ // to generate a copy of the output just in case.
+ if (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION) {
+ return attempt.getInternalState();
+ }
+ String message = attempt.getID() + " being failed for too many output errors";
+ LOG.info(message);
+ attempt.addDiagnosticInfo(message);
+ if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
+ (new TerminatedAfterSuccessTransition(FAILED_HELPER)).transition(
+ attempt, event);
+ return TaskAttemptStateInternal.FAILED;
+ } else {
+ (new TerminatedWhileRunningTransition(FAILED_HELPER)).transition(
+ attempt, event);
+ return TaskAttemptStateInternal.FAIL_IN_PROGRESS;
+ }
+ // TODO at some point. Nodes may be interested in FetchFailure info.
+ // Can be used to blacklist nodes.
+ }
+ }
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3749a18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index c1a9415..36d0abb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -55,6 +54,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -76,7 +76,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -209,16 +208,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// the stages. i.e. Task would only SUCCEED after all output consumed.
.addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
- TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+ TaskEventType.T_ATTEMPT_FAILED, new TaskRetroactiveFailureTransition())
.addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
- TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
+ TaskEventType.T_ATTEMPT_KILLED, new TaskRetroactiveKilledTransition())
.addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
// Ignore-able transitions.
.addTransition(
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
+ TaskEventType.T_TERMINATE,
TaskEventType.T_ATTEMPT_LAUNCHED))
// Transitions from FAILED state
@@ -749,41 +749,26 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
- TezDependentTaskCompletionEvent.Status status) {
- TaskAttempt attempt = attempts.get(attemptId);
+ TaskAttemptStateInternal attemptState) {
// raise the completion event only if the container is assigned
// to nextAttemptNumber
if (needsWaitAfterOutputConsumable()) {
// An event may have been sent out during the OUTPUT_READY state itself.
// Make sure the same event is not being sent out again.
if (attemptId == outputConsumableAttempt
- && status == TezDependentTaskCompletionEvent.Status.SUCCEEDED) {
+ && attemptState == TaskAttemptStateInternal.SUCCEEDED) {
if (outputConsumableAttemptSuccessSent) {
return;
}
}
}
- if (attempt.getNodeHttpAddress() != null) {
-
- int runTime = 0;
- if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() != 0)
- runTime = (int) (attempt.getFinishTime() - attempt.getLaunchTime());
-
- // TODO TEZ-347. Get this event from Task instead of generating here
- long dataSize = getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue();
- TezDependentTaskCompletionEvent tce = new TezDependentTaskCompletionEvent(
- -1, attemptId, status, runTime, dataSize);
-
- // raise the event to job so that it adds the completion event to its
- // data structures
- eventHandler.handle(new VertexEventTaskAttemptCompleted(tce));
- }
+ eventHandler.handle(new VertexEventTaskAttemptCompleted(attemptId, attemptState));
}
// always called inside a transition, in turn inside the Write Lock
private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
- TezDependentTaskCompletionEvent.Status status) {
- this.sendTaskAttemptCompletionEvent(attemptId, status);
+ TaskAttemptStateInternal attemptState) {
+ this.sendTaskAttemptCompletionEvent(attemptId, attemptState);
}
// TODO: Recovery
@@ -893,7 +878,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.outputConsumableAttempt == null) {
task.sendTaskAttemptCompletionEvent(attemptId,
- TezDependentTaskCompletionEvent.Status.SUCCEEDED);
+ TaskAttemptStateInternal.SUCCEEDED);
task.outputConsumableAttempt = attemptId;
task.outputConsumableAttemptSuccessSent = true;
if (LOG.isDebugEnabled()) {
@@ -932,7 +917,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
task.handleTaskAttemptCompletion(successTaId,
- TezDependentTaskCompletionEvent.Status.SUCCEEDED);
+ TaskAttemptStateInternal.SUCCEEDED);
task.finishedAttempts++;
--task.numberUncompletedAttempts;
task.successfulAttempt = successTaId;
@@ -974,7 +959,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
task.handleTaskAttemptCompletion(
castEvent.getTaskAttemptID(),
- TezDependentTaskCompletionEvent.Status.KILLED);
+ TaskAttemptStateInternal.KILLED);
task.finishedAttempts++;
// we don't need a new event if we already have a spare
if (--task.numberUncompletedAttempts == 0
@@ -994,7 +979,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.handleTaskAttemptCompletion(
((TaskEventTAUpdate) event).getTaskAttemptID(),
- TezDependentTaskCompletionEvent.Status.KILLED);
+ TaskAttemptStateInternal.KILLED);
task.finishedAttempts++;
// check whether all attempts are finished
if (task.finishedAttempts == task.attempts.size()) {
@@ -1027,7 +1012,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (castEvent.getTaskAttemptID().equals(task.outputConsumableAttempt)) {
task.outputConsumableAttempt = null;
task.handleTaskAttemptCompletion(castEvent.getTaskAttemptID(),
- TezDependentTaskCompletionEvent.Status.FAILED);
+ TaskAttemptStateInternal.FAILED);
}
// The attempt would have informed the scheduler about it's failure
@@ -1036,7 +1021,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.failedAttempts < task.maxAttempts) {
task.handleTaskAttemptCompletion(
((TaskEventTAUpdate) event).getTaskAttemptID(),
- TezDependentTaskCompletionEvent.Status.FAILED);
+ TaskAttemptStateInternal.FAILED);
// we don't need a new event if we already have a spare
if (--task.numberUncompletedAttempts == 0
&& task.successfulAttempt == null) {
@@ -1045,7 +1030,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
} else {
task.handleTaskAttemptCompletion(
((TaskEventTAUpdate) event).getTaskAttemptID(),
- TezDependentTaskCompletionEvent.Status.TIPFAILED);
+ TaskAttemptStateInternal.FAILED);
if (task.historyTaskStartGenerated) {
task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
@@ -1065,36 +1050,35 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- private static class MapRetroactiveFailureTransition
+ private static class TaskRetroactiveFailureTransition
extends AttemptFailedTransition {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
- if (event instanceof TaskEventTAUpdate) {
- TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
- if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
- !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
- // don't allow a different task attempt to override a previous
- // succeeded state
- return TaskStateInternal.SUCCEEDED;
- }
- }
-
if (task.leafVertex) {
LOG.error("Unexpected event for task of leaf vertex " + event.getType());
task.internalError(event.getType());
}
+ TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+ if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
+ !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+ // don't allow a different task attempt to override a previous
+ // succeeded state
+ return TaskStateInternal.SUCCEEDED;
+ }
+
// tell the job about the rescheduling
- task.eventHandler.handle(
- new VertexEventTaskReschedule(task.taskId));
+ task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
// super.transition is mostly coded for the case where an
// UNcompleted task failed. When a COMPLETED task retroactively
// fails, we have to let AttemptFailedTransition.transition
// believe that there's no redundancy.
unSucceed(task);
- // fake increase in Uncomplete attempts for super.transition
+
+ // fake values for code for super.transition
++task.numberUncompletedAttempts;
+ task.finishedAttempts--;
return super.transition(task, event);
}
@@ -1104,7 +1088,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- private static class MapRetroactiveKilledTransition implements
+ private static class TaskRetroactiveKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
@@ -1124,7 +1108,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
unSucceed(task);
task.handleTaskAttemptCompletion(
attemptId,
- TezDependentTaskCompletionEvent.Status.KILLED);
+ TaskAttemptStateInternal.KILLED);
task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
// typically we are here because this map task was run on a bad node and
// we want to reschedule it on a different node.