You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/08 00:44:39 UTC
[2/3] tez git commit: TEZ-776. Reduce AM mem usage caused by storing
TezEvents (bikas)
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5d61642..a16ee0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -91,6 +91,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
@@ -134,6 +135,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
+import org.apache.tez.dag.app.dag.impl.Edge.PendingEventRouteMetadata;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
@@ -202,7 +204,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
//final fields
private final Clock clock;
-
private final Lock readLock;
private final Lock writeLock;
private final TaskAttemptListener taskAttemptListener;
@@ -225,6 +226,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private Configuration vertexConf;
private final boolean isSpeculationEnabled;
+
+ @VisibleForTesting
+ public boolean useOnDemandRouting = true;
//fields initialized in init
@@ -726,9 +730,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private Set<String> inputsWithInitializers;
private int numInitializedInputs;
private boolean startSignalPending = false;
- private boolean tasksNotYetScheduled = true;
// We may always store task events in the vertex for scalability
List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
+ private boolean tasksNotYetScheduled = true;
+ // must be a random access structure
+
+ private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity(1000);
+ private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock();
+ private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock();
+ private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock();
+
+ private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
+ new ArrayList(0);
List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
@@ -771,6 +784,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private VertexStats vertexStats = null;
private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
+
+ static class EventInfo {
+ final TezEvent tezEvent;
+ final Edge eventEdge;
+ final int eventTaskIndex;
+ EventInfo(TezEvent tezEvent, Edge eventEdge, int eventTaskIndex) {
+ this.tezEvent = tezEvent;
+ this.eventEdge = eventEdge;
+ this.eventTaskIndex = eventTaskIndex;
+ }
+ }
private VertexStatisticsImpl finalStatistics;
@@ -1175,6 +1199,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
+ @VisibleForTesting
+ List<EventInfo> getOnDemandRouteEvents() {
+ return onDemandRouteEvents;
+ }
+
private void computeProgress() {
this.readLock.lock();
try {
@@ -1388,24 +1417,51 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- @Override
- public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
- writeLock.lock();
- try {
+ void setupEdgeRouting() throws AMUserCodeException {
+ for (Edge e : sourceVertices.values()) {
+ boolean edgeDoingOnDemand = e.routingToBegin();
+ if (useOnDemandRouting && !edgeDoingOnDemand) {
+ useOnDemandRouting = false;
+ LOG.info("Not using ondemand routing because of edge between " + e.getSourceVertexName()
+ + " and " + getLogIdentifier());
+ }
+ }
+ }
+
+ private void unsetTasksNotYetScheduled() throws AMUserCodeException {
+ if (tasksNotYetScheduled) {
+ setupEdgeRouting();
tasksNotYetScheduled = false;
+ // only now can we be sure of the edge manager type. so until now
+ // we will accumulate pending tasks in case legacy routing gets used.
+ // this is only needed to support mixed mode routing. Else for
+ // on demand routing events can be directly added to taskEvents when
+ // they arrive in handleRoutedEvents instead of first caching them in
+ // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents
+ // can be removed.
if (!pendingTaskEvents.isEmpty()) {
LOG.info("Routing pending task events for vertex: " + logIdentifier);
try {
- handleRoutedTezEvents(this, pendingTaskEvents, false, true);
+ handleRoutedTezEvents(pendingTaskEvents, false, true);
} catch (AMUserCodeException e) {
- String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier;
+ String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier;
LOG.error(msg, e);
- addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
- eventHandler.handle(new VertexEventTermination(vertexId, VertexTerminationCause.AM_USERCODE_FAILURE));
+ addDiagnostic(msg + ", " + e.getMessage() + ", "
+ + ExceptionUtils.getStackTrace(e.getCause()));
+ eventHandler.handle(new VertexEventTermination(vertexId,
+ VertexTerminationCause.AM_USERCODE_FAILURE));
return;
}
pendingTaskEvents.clear();
}
+ }
+ }
+
+ @Override
+ public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
+ writeLock.lock();
+ try {
+ unsetTasksNotYetScheduled();
for (TaskWithLocationHint task : tasksToSchedule) {
if (numTasks <= task.getTaskIndex().intValue()) {
throw new TezUncheckedException(
@@ -1422,6 +1478,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue()),
TaskEventType.T_SCHEDULE));
}
+ } catch (AMUserCodeException e) {
+ String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
+ LOG.error(msg, e);
+ // send event to fail the vertex
+ eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e));
+ // throw an unchecked exception to stop the vertex manager that invoked this.
+ throw new TezUncheckedException(e);
} finally {
writeLock.unlock();
}
@@ -2497,7 +2560,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
break;
case RUNNING:
- vertex.tasksNotYetScheduled = false;
try {
vertex.initializeCommitters();
} catch (Exception e) {
@@ -2530,6 +2592,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
try {
vertex.recoveryCodeSimulatingStart();
+ vertex.unsetTasksNotYetScheduled();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -2560,7 +2623,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexTerminationCause.COMMIT_FAILURE, msg);
endState = VertexState.FAILED;
} else {
- vertex.tasksNotYetScheduled = false;
// recover tasks
if (vertex.tasks != null && vertex.numTasks != 0) {
TaskState taskState = TaskState.KILLED;
@@ -2578,6 +2640,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
try {
vertex.recoveryCodeSimulatingStart();
+ vertex.unsetTasksNotYetScheduled();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
@@ -2901,7 +2964,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
endState = VertexState.INITED;
break;
case RUNNING:
- vertex.tasksNotYetScheduled = false;
// if commit in progress and desired state is not a succeeded one,
// move to failed
if (vertex.recoveryCommitInProgress) {
@@ -2946,6 +3008,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
try {
vertex.recoveryCodeSimulatingStart();
+ vertex.unsetTasksNotYetScheduled();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
@@ -2962,7 +3025,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
case SUCCEEDED:
case FAILED:
case KILLED:
- vertex.tasksNotYetScheduled = false;
// recover tasks
assert vertex.tasks.size() == vertex.numTasks;
if (vertex.tasks != null && vertex.numTasks != 0) {
@@ -2982,6 +3044,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
// Wait for all tasks to recover and report back
try {
vertex.recoveryCodeSimulatingStart();
+ vertex.unsetTasksNotYetScheduled();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -3025,7 +3088,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
vertex.recoveredEvents.clear();
if (!vertex.pendingRouteEvents.isEmpty()) {
try {
- handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false, true);
+ vertex.handleRoutedTezEvents(vertex.pendingRouteEvents, false, true);
vertex.pendingRouteEvents.clear();
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
@@ -3284,7 +3347,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
List<TezEvent> inputInfoEvents = iEvent.getEvents();
try {
if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
- VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false, false);
+ vertex.handleRoutedTezEvents(inputInfoEvents, false, false);
}
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -3941,7 +4004,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
boolean recovered = rEvent.isRecovered();
List<TezEvent> tezEvents = rEvent.getEvents();
try {
- VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered, false);
+ vertex.handleRoutedTezEvents(tezEvents, recovered, false);
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
LOG.error(msg, e);
@@ -3959,16 +4022,105 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return vertex.getState();
}
}
+
+ @Override
+ public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+ int fromEventId, int maxEvents) {
+ if (!useOnDemandRouting) {
+ List<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(attemptID, fromEventId, maxEvents);
+ return new TaskAttemptEventInfo(fromEventId + events.size(), events);
+ }
- private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
- if (vertex.getAppContext().isRecoveryEnabled()
+ onDemandRouteEventsReadLock.lock();
+ try {
+ List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
+ int nextFromEventId = fromEventId;
+ int currEventCount = onDemandRouteEvents.size();
+ try {
+ if (currEventCount > fromEventId) {
+ events = Lists.newArrayListWithCapacity(maxEvents);
+ int taskIndex = attemptID.getTaskID().getId();
+ Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID
+ + " vertex: " + getLogIdentifier());
+ boolean isFirstEvent = true;
+ for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId) {
+ boolean earlyExit = false;
+ if (events.size() == maxEvents) {
+ break;
+ }
+ EventInfo eventInfo = onDemandRouteEvents.get(nextFromEventId);
+ TezEvent tezEvent = eventInfo.tezEvent;
+ switch(tezEvent.getEventType()) {
+ case INPUT_FAILED_EVENT:
+ case DATA_MOVEMENT_EVENT:
+ case COMPOSITE_DATA_MOVEMENT_EVENT:
+ {
+ int srcTaskIndex = eventInfo.eventTaskIndex;
+ Edge srcEdge = eventInfo.eventEdge;
+ PendingEventRouteMetadata pendingRoute = null;
+ if (isFirstEvent) {
+ // do this precondition check only for the first event
+ isFirstEvent = false;
+ pendingRoute = srcEdge.removePendingEvents(attemptID);
+ if (pendingRoute != null) {
+ Preconditions.checkState(tezEvent == pendingRoute.getTezEvent()); // same object
+ }
+ }
+ if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex,
+ events, maxEvents, pendingRoute)) {
+ // not enough space left for this iteration events.
+ // Exit and start from here next time
+ earlyExit = true;
+ }
+ }
+ break;
+ case ROOT_INPUT_DATA_INFORMATION_EVENT:
+ {
+ InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
+ if (riEvent.getTargetIndex() == taskIndex) {
+ events.add(tezEvent);
+ }
+ }
+ break;
+ default:
+ throw new TezUncheckedException("Unexpected event type for task: "
+ + tezEvent.getEventType());
+ }
+ if (earlyExit) {
+ break;
+ }
+ }
+ }
+ } catch (AMUserCodeException e) {
+ String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
+ LOG.error(msg, e);
+ eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e));
+ nextFromEventId = fromEventId;
+ events.clear();
+ }
+
+ if (events.size() > 0) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Sending ").append(attemptID).append(" numEvents: ").append(events.size())
+ .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
+ .append(" out of ").append(currEventCount).append(" events in vertex: ").append(getLogIdentifier());
+ LOG.info(builder.toString());
+ }
+ return new TaskAttemptEventInfo(nextFromEventId, events);
+ } finally {
+ onDemandRouteEventsReadLock.unlock();
+ }
+ }
+
+ private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
+ if (getAppContext().isRecoveryEnabled()
&& !recovered
&& !isPendingEvents
&& !tezEvents.isEmpty()) {
List<TezEvent> recoveryEvents =
Lists.newArrayList();
for (TezEvent tezEvent : tezEvents) {
- if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
+ if (!isEventFromVertex(this, tezEvent.getSourceInfo())) {
continue;
}
if (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
@@ -3980,15 +4132,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
if (!recoveryEvents.isEmpty()) {
VertexRecoverableEventsGeneratedEvent historyEvent =
- new VertexRecoverableEventsGeneratedEvent(vertex.vertexId,
+ new VertexRecoverableEventsGeneratedEvent(vertexId,
recoveryEvents);
- vertex.appContext.getHistoryHandler().handle(
- new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
+ appContext.getHistoryHandler().handle(
+ new DAGHistoryEvent(getDAGId(), historyEvent));
}
}
for(TezEvent tezEvent : tezEvents) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex: " + vertex.getName() + " routing event: "
+ LOG.debug("Vertex: " + getLogIdentifier() + " routing event: "
+ tezEvent.getEventType()
+ " Recovered:" + recovered);
}
@@ -3998,7 +4150,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
case DATA_MOVEMENT_EVENT:
case COMPOSITE_DATA_MOVEMENT_EVENT:
{
- if (isEventFromVertex(vertex, sourceMeta)) {
+ if (isEventFromVertex(this, sourceMeta)) {
// event from this vertex. send to destination vertex
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
@@ -4008,56 +4160,86 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
} else {
((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
}
- Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
- Edge destEdge = vertex.targetVertices.get(destVertex);
+ Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
+ Edge destEdge = targetVertices.get(destVertex);
if (destEdge == null) {
throw new TezUncheckedException("Bad destination vertex: " +
sourceMeta.getEdgeVertexName() + " for event vertex: " +
- vertex.getLogIdentifier());
+ getLogIdentifier());
}
- vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex
+ eventHandler.handle(new VertexEventRouteEvent(destVertex
.getVertexId(), Collections.singletonList(tezEvent)));
} else {
- // event not from this vertex. must have come from source vertex.
- // send to tasks
- if (vertex.tasksNotYetScheduled) {
- vertex.pendingTaskEvents.add(tezEvent);
+ if (tasksNotYetScheduled) {
+ // this is only needed to support mixed mode routing. Else for
+ // on demand routing events can be directly added to taskEvents
+ // when legacy routing is removed then pending task events can be
+ // removed.
+ pendingTaskEvents.add(tezEvent);
} else {
- Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
- sourceMeta.getTaskVertexName()));
- if (srcEdge == null) {
- throw new TezUncheckedException("Bad source vertex: " +
- sourceMeta.getTaskVertexName() + " for destination vertex: " +
- vertex.getLogIdentifier());
+ // event not from this vertex. must have come from source vertex.
+ if (useOnDemandRouting) {
+ int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId();
+ Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
+ Edge srcEdge = sourceVertices.get(edgeVertex);
+ if (srcEdge == null) {
+ throw new TezUncheckedException("Bad source vertex: " +
+ sourceMeta.getTaskVertexName() + " for destination vertex: " +
+ getLogIdentifier());
+ }
+ onDemandRouteEventsWriteLock.lock();
+ try {
+ onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
+ } finally {
+ onDemandRouteEventsWriteLock.unlock();
+ }
+ } else {
+ // send to tasks
+ Edge srcEdge = sourceVertices.get(getDAG().getVertex(
+ sourceMeta.getTaskVertexName()));
+ if (srcEdge == null) {
+ throw new TezUncheckedException("Bad source vertex: "
+ + sourceMeta.getTaskVertexName() + " for destination vertex: "
+ + getLogIdentifier());
+ }
+ srcEdge.sendTezEventToDestinationTasks(tezEvent);
}
- srcEdge.sendTezEventToDestinationTasks(tezEvent);
}
}
}
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
- if (vertex.tasksNotYetScheduled) {
- vertex.pendingTaskEvents.add(tezEvent);
+ {
+ checkEventSourceMetadata(this, sourceMeta);
+ if (tasksNotYetScheduled) {
+ // this is only needed to support mixed mode routing. Else for
+ // on demand routing events can be directly added to taskEvents
+ // when legacy routing is removed then pending task events can be
+ // removed.
+ pendingTaskEvents.add(tezEvent);
} else {
- checkEventSourceMetadata(vertex, sourceMeta);
- InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent
- .getEvent();
- Task targetTask = vertex.getTask(riEvent.getTargetIndex());
- targetTask.registerTezEvent(tezEvent);
+ if (useOnDemandRouting) {
+ onDemandRouteEvents.add(new EventInfo(tezEvent, null, -1));
+ } else {
+ InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
+ Task targetTask = getTask(riEvent.getTargetIndex());
+ targetTask.registerTezEvent(tezEvent);
+ }
}
+ }
break;
case VERTEX_MANAGER_EVENT:
{
// VM events on task success only can be changed as part of TEZ-1532
VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
- Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
+ Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
- if (target == vertex) {
- vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
+ if (target == this) {
+ vertexManager.onVertexManagerEventReceived(vmEvent);
} else {
- checkEventSourceMetadata(vertex, sourceMeta);
- vertex.eventHandler.handle(new VertexEventRouteEvent(target
+ checkEventSourceMetadata(this, sourceMeta);
+ eventHandler.handle(new VertexEventRouteEvent(target
.getVertexId(), Collections.singletonList(tezEvent)));
}
}
@@ -4065,45 +4247,46 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
case ROOT_INPUT_INITIALIZER_EVENT:
{
InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent();
- Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
+ Vertex target = getDAG().getVertex(riEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unknown vertex: " + riEvent.getTargetVertexName());
riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
- if (target == vertex) {
- if (vertex.rootInputDescriptors == null ||
- !vertex.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
+ if (target == this) {
+ if (rootInputDescriptors == null ||
+ !rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
throw new TezUncheckedException(
"InputInitializerEvent targeted at unknown initializer on vertex " +
- vertex.logIdentifier + ", Event=" + riEvent);
+ logIdentifier + ", Event=" + riEvent);
}
- if (vertex.getState() == VertexState.NEW) {
- vertex.pendingInitializerEvents.add(tezEvent);
- } else if (vertex.getState() == VertexState.INITIALIZING) {
- vertex.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
+ if (getState() == VertexState.NEW) {
+ pendingInitializerEvents.add(tezEvent);
+ } else if (getState() == VertexState.INITIALIZING) {
+ rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
} else {
// Currently, INITED and subsequent states means Initializer complete / failure
if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " + vertex.getLogIdentifier() + ", state=" + vertex.getState());
+ LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in "
+ + getLogIdentifier() + ", state=" + getState());
}
}
} else {
- checkEventSourceMetadata(vertex, sourceMeta);
- vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
+ checkEventSourceMetadata(this, sourceMeta);
+ eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
Collections.singletonList(tezEvent)));
}
}
break;
case INPUT_READ_ERROR_EVENT:
{
- checkEventSourceMetadata(vertex, sourceMeta);
- Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+ checkEventSourceMetadata(this, sourceMeta);
+ Edge srcEdge = sourceVertices.get(this.getDAG().getVertex(
sourceMeta.getEdgeVertexName()));
srcEdge.sendTezEventToSourceTasks(tezEvent);
}
break;
case TASK_ATTEMPT_FAILED_EVENT:
{
- checkEventSourceMetadata(vertex, sourceMeta);
+ checkEventSourceMetadata(this, sourceMeta);
TaskAttemptTerminationCause errCause = null;
switch (sourceMeta.getEventGenerator()) {
case INPUT:
@@ -4124,7 +4307,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
TaskAttemptFailedEvent taskFailedEvent =
(TaskAttemptFailedEvent) tezEvent.getEvent();
- vertex.getEventHandler().handle(
+ getEventHandler().handle(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED,
"Error: " + taskFailedEvent.getDiagnostics(),
@@ -4134,8 +4317,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
{
- checkEventSourceMetadata(vertex, sourceMeta);
- vertex.getEventHandler().handle(
+ checkEventSourceMetadata(this, sourceMeta);
+ getEventHandler().handle(
new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
}
break;
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 18286b5..5cd487c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -92,8 +92,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
MockContainerLauncher containerLauncher;
boolean initFailFlag;
boolean startFailFlag;
- boolean sendDMEvents;
boolean recoveryFatalError = false;
+ EventsDelegate eventsDelegate;
CountersDelegate countersDelegate;
StatisticsDelegate statsDelegate;
long launcherSleepTime = 1;
@@ -112,6 +112,10 @@ public class MockDAGAppMaster extends DAGAppMaster {
public static interface CountersDelegate {
public TezCounters getCounters(TaskSpec taskSpec);
}
+
+ public static interface EventsDelegate {
+ public void getEvents(TaskSpec taskSpec, List<TezEvent> events);
+ }
// mock container launcher does not launch real tasks.
// Upon, launch of a container is simulates the container asking for tasks
@@ -334,7 +338,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
if (response.shouldDie()) {
cData.remove();
} else {
- cData.nextFromEventId += response.getEvents().size();
+ cData.nextFromEventId += response.getNextFromEventId();
if (!response.getEvents().isEmpty()) {
long stopTime = System.nanoTime();
long stopCpuTime = threadMxBean.getCurrentThreadCpuTime();
@@ -400,19 +404,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
updatesToMake != null && cData.numUpdates < updatesToMake) {
List<TezEvent> events = Lists.newArrayListWithCapacity(
cData.taskSpec.getOutputs().size() + 1);
- if (sendDMEvents) {
- for (OutputSpec output : cData.taskSpec.getOutputs()) {
- if (output.getPhysicalEdgeCount() == 1) {
- events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
- EventProducerConsumerType.OUTPUT, cData.vName, output
- .getDestinationVertexName(), cData.taId)));
- } else {
- events.add(new TezEvent(CompositeDataMovementEvent.create(0,
- output.getPhysicalEdgeCount(), null), new EventMetaData(
- EventProducerConsumerType.OUTPUT, cData.vName, output
- .getDestinationVertexName(), cData.taId)));
- }
- }
+ if (cData.numUpdates == 0 && eventsDelegate != null) {
+ eventsDelegate.getEvents(cData.taskSpec, events);
}
TezCounters counters = null;
if (countersDelegate != null) {
@@ -428,7 +421,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
- cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
+ cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
doHeartbeat(request, cData);
} else if (version != null && cData.taId.getId() <= version.intValue()) {
preemptContainer(cData);
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
new file mode 100644
index 0000000..c277b38
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
@@ -0,0 +1,219 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+// The objective of these tests is to make sure the large job simulations pass
+// within the memory limits set by the junit tests (1GB)
+// For large jobs please increase memory limits to account for memory used by the
+// simulation code itself
+public class TestMemoryWithEvents {
+ static Configuration defaultConf;
+ static FileSystem localFs;
+
+ static {
+ try {
+ defaultConf = new Configuration(false);
+ defaultConf.set("fs.defaultFS", "file:///");
+ defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ localFs = FileSystem.getLocal(defaultConf);
+ String stagingDir = "target" + Path.SEPARATOR + TestMemoryWithEvents.class.getName() + "-tmpDir";
+ defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
+ Logger.getRootLogger().setLevel(Level.WARN);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ final int numThreads = 30;
+ final int numTasks = 10000;
+
+ private void checkMemory(String name, MockDAGAppMaster mockApp) {
+ long mb = 1024*1024;
+ long microsPerMs = 1000;
+
+ //Getting the runtime reference from system
+ Runtime runtime = Runtime.getRuntime();
+
+ System.out.println("##### Heap utilization statistics [MB] for " + name);
+
+ runtime.gc();
+
+ //Print used memory
+ System.out.println("##### Used Memory:"
+ + (runtime.totalMemory() - runtime.freeMemory()) / mb);
+
+ //Print free memory
+ System.out.println("##### Free Memory:"
+ + runtime.freeMemory() / mb);
+
+ //Print total available memory
+ System.out.println("##### Total Memory:" + runtime.totalMemory() / mb);
+
+ //Print Maximum available memory
+ System.out.println("##### Max Memory:" + runtime.maxMemory() / mb);
+
+ //Print Maximum heartbeat time
+ long numHeartbeats = mockApp.numHearbeats.get();
+ if (numHeartbeats == 0) {
+ numHeartbeats = 1;
+ }
+ System.out.println("##### Heartbeat (ms) :"
+ + " latency avg: " + ((mockApp.heartbeatTime.get() / numHeartbeats) / microsPerMs)
+ + " cpu total: " + (mockApp.heartbeatCpu.get() / microsPerMs)
+ + " cpu avg: " + ((mockApp.heartbeatCpu.get() / numHeartbeats) / microsPerMs)
+ + " numHeartbeats: " + mockApp.numHearbeats.get());
+ }
+
+ private void testMemory(DAG dag, boolean sendDMEvents) throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+ null, false, false, numThreads, 1000);
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+ mockApp.eventsDelegate = new TestMockDAGAppMaster.TestEventsDelegate();
+ mockApp.doSleep = false;
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ mockLauncher.waitTillContainersLaunched();
+ mockLauncher.startScheduling(true);
+ DAGStatus status = dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+ checkMemory(dag.getName(), mockApp);
+
+ tezClient.stop();
+ }
+
+ public static class SimulationInitializer extends InputInitializer {
+ public SimulationInitializer(InputInitializerContext initializerContext) {
+ super(initializerContext);
+ }
+
+ @Override
+ public List<Event> initialize() throws Exception {
+ int numTasks = getContext().getNumTasks();
+ List<Event> events = Lists.newArrayListWithCapacity(numTasks);
+ for (int i=0; i<numTasks; ++i) {
+ events.add(InputDataInformationEvent.createWithSerializedPayload(i, null));
+ }
+ return events;
+ }
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+ }
+ }
+
+ @Ignore
+ @Test (timeout = 600000)
+ public void testMemoryRootInputEvents() throws Exception {
+ DAG dag = DAG.create("testMemoryRootInputEvents");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+ Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+ vA.addDataSource(
+ "Input",
+ DataSourceDescriptor.create(InputDescriptor.create("In"),
+ InputInitializerDescriptor.create(SimulationInitializer.class.getName()), null));
+ dag.addVertex(vA).addVertex(vB);
+ testMemory(dag, false);
+ }
+
+ @Ignore
+ @Test (timeout = 600000)
+ public void testMemoryOneToOne() throws Exception {
+ DAG dag = DAG.create("testMemoryOneToOne");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+ Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+ dag.addVertex(vA)
+ .addVertex(vB)
+ .addEdge(
+ Edge.create(vA, vB, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+ testMemory(dag, true);
+ }
+
+ @Ignore
+ @Test (timeout = 600000)
+ public void testMemoryBroadcast() throws Exception {
+ DAG dag = DAG.create("testMemoryBroadcast");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+ Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+ dag.addVertex(vA)
+ .addVertex(vB)
+ .addEdge(
+ Edge.create(vA, vB, EdgeProperty.create(DataMovementType.BROADCAST,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+ testMemory(dag, true);
+ }
+
+ @Ignore
+ @Test (timeout = 600000)
+ public void testMemoryScatterGather() throws Exception {
+ DAG dag = DAG.create("testMemoryScatterGather");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+ Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+ dag.addVertex(vA)
+ .addVertex(vB)
+ .addEdge(
+ Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+ testMemory(dag, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 87ffead..1e7faf9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -49,6 +50,9 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -69,6 +73,7 @@ import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
+import org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
import org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate;
@@ -84,13 +89,17 @@ import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.IOStatistics;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -101,8 +110,7 @@ import com.google.common.primitives.Ints;
public class TestMockDAGAppMaster {
private static final Log LOG = LogFactory.getLog(TestMockDAGAppMaster.class);
static Configuration defaultConf;
- static FileSystem localFs;
-
+ static FileSystem localFs;
static {
try {
defaultConf = new Configuration(false);
@@ -116,6 +124,24 @@ public class TestMockDAGAppMaster {
}
}
+ static class TestEventsDelegate implements EventsDelegate {
+ @Override
+ public void getEvents(TaskSpec taskSpec, List<TezEvent> events) {
+ for (OutputSpec output : taskSpec.getOutputs()) {
+ if (output.getPhysicalEdgeCount() == 1) {
+ events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
+ EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
+ .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+ } else {
+ events.add(new TezEvent(CompositeDataMovementEvent.create(0,
+ output.getPhysicalEdgeCount(), null), new EventMetaData(
+ EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
+ .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+ }
+ }
+ }
+ }
+
@Test (timeout = 5000)
public void testLocalResourceSetup() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
@@ -196,7 +222,7 @@ public class TestMockDAGAppMaster {
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
- mockApp.sendDMEvents = true;
+ mockApp.eventsDelegate = new TestEventsDelegate();
DAG dag = DAG.create("testBasicEvents");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
@@ -227,7 +253,8 @@ public class TestMockDAGAppMaster {
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vB.getName());
TaskImpl tImpl = (TaskImpl) vImpl.getTask(1);
- List<TezEvent> tEvents = tImpl.getTaskEvents();
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
+ List<TezEvent> tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -240,7 +267,8 @@ public class TestMockDAGAppMaster {
(targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
- tEvents = tImpl.getTaskEvents();
+ taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
+ tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -253,7 +281,8 @@ public class TestMockDAGAppMaster {
(targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
- tEvents = tImpl.getTaskEvents();
+ taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
+ tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
Assert.assertEquals(1, tEvents.size()); // 1 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
@@ -261,6 +290,125 @@ public class TestMockDAGAppMaster {
tezClient.stop();
}
+
+ public static class LegacyEdgeTestEdgeManager extends EdgeManagerPlugin {
+ List<Integer> destinationInputIndices =
+ Collections.unmodifiableList(Collections.singletonList(0));
+ public LegacyEdgeTestEdgeManager(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
+ return 1;
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
+ return 1;
+ }
+
+ @Override
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int sourceOutputIndex,
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+ destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+ }
+
+ @Override
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+ destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex, int destinationFailedInputIndex) {
+ return destinationTaskIndex;
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return 1;
+ }
+ }
+
+ @Test (timeout = 100000)
+ public void testMixedEdgeRouting() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+ mockApp.eventsDelegate = new TestEventsDelegate();
+ DAG dag = DAG.create("testMixedEdgeRouting");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
+ Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 1);
+ Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 1);
+ Vertex vD = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 1);
+ Vertex vE = Vertex.create("E", ProcessorDescriptor.create("Proc.class"), 1);
+ dag.addVertex(vA)
+ .addVertex(vB)
+ .addVertex(vC)
+ .addVertex(vD)
+ .addVertex(vE)
+ .addEdge(
+ Edge.create(vA, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+ .addEdge(
+ Edge.create(vB, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+ .addEdge(
+ Edge.create(vA, vD, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+ .addEdge(
+ Edge.create(vB, vD, EdgeProperty.create(
+ EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()),
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+ .addEdge(
+ Edge.create(vB, vE, EdgeProperty.create(
+ EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()),
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ mockLauncher.waitTillContainersLaunched();
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ // vC uses on demand routing and its task does not provide events
+ VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
+ Assert.assertEquals(true, vImpl.useOnDemandRouting);
+ TaskImpl tImpl = (TaskImpl) vImpl.getTask(0);
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+ Assert.assertEquals(0, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+ // vD is mixed more and does not use on demand routing and its task provides events
+ vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
+ Assert.assertEquals(false, vImpl.useOnDemandRouting);
+ tImpl = (TaskImpl) vImpl.getTask(0);
+ taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+ Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+ // vE has single legacy edge and does not use on demand routing and its task provides events
+ vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
+ Assert.assertEquals(false, vImpl.useOnDemandRouting);
+ tImpl = (TaskImpl) vImpl.getTask(0);
+ taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+ Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+
+ tezClient.stop();
+ }
@Test (timeout = 10000)
public void testBasicCounters() throws Exception {
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index f974f40..db8eff1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -42,7 +42,6 @@ import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
@@ -62,6 +61,7 @@ import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -186,7 +186,7 @@ public class TestTaskAttemptListenerImplTezDag {
new TezEvent(new TaskAttemptCompletedEvent(), null)
);
- EventHandler eventHandler = generateHeartbeat(events);
+ generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(2)).handle(arg.capture());
@@ -212,7 +212,7 @@ public class TestTaskAttemptListenerImplTezDag {
List<TezEvent> events = Arrays.asList(
new TezEvent(new TaskAttemptCompletedEvent(), null)
);
- final EventHandler eventHandler = generateHeartbeat(events);
+ generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(1)).handle(arg.capture());
@@ -222,18 +222,29 @@ public class TestTaskAttemptListenerImplTezDag {
assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
event.getType());
}
+
+ @Test (timeout = 5000)
+ public void testTaskHeartbeatResponse() throws Exception {
+ List<TezEvent> events = new ArrayList<TezEvent>();
+ List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
+ TezHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
+
+ assertEquals(2, response.getNextFromEventId());
+ assertEquals(1, response.getLastRequestId());
+ assertEquals(eventsToSend, response.getEvents());
+ }
- private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException, TezException {
+ private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events,
+ int fromEventId, int maxEvents, int nextFromEventId,
+ List<TezEvent> sendEvents) throws IOException, TezException {
ContainerId containerId = createContainerId(appId, 1);
long requestId = 0;
Vertex vertex = mock(Vertex.class);
- Task task = mock(Task.class);
doReturn(vertex).when(dag).getVertex(vertexID);
doReturn("test_vertex").when(vertex).getName();
- doReturn(task).when(vertex).getTask(taskID);
-
- doReturn(new ArrayList<TezEvent>()).when(task).getTaskAttemptTezEvents(taskAttemptID, 0, 1);
+ TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents);
+ doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, maxEvents);
taskAttemptListener.registerRunningContainer(containerId);
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);
@@ -243,10 +254,10 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID();
doReturn(++requestId).when(request).getRequestId();
doReturn(events).when(request).getEvents();
+ doReturn(maxEvents).when(request).getMaxEvents();
+ doReturn(fromEventId).when(request).getStartIndex();
- taskAttemptListener.heartbeat(request);
-
- return eventHandler;
+ return taskAttemptListener.heartbeat(request);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ba40146..d2aa2d0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -51,9 +51,9 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.EdgeProperty;
@@ -1028,6 +1028,8 @@ public class TestDAGImpl {
new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
+ v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+ dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
Assert.assertEquals(VertexState.KILLED, v1.getState());
@@ -1037,7 +1039,40 @@ public class TestDAGImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testEdgeManager_RouteInputSourceTaskFailedEventToDestination() {
+ public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting() {
+ // Remove after legacy routing is removed
+ setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
+ dispatcher.getEventHandler().handle(
+ new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+ dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
+ null));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
+
+ VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+ v1.useOnDemandRouting = false;
+ v2.useOnDemandRouting = false;
+ dispatcher.await();
+ Task t1= v2.getTask(0);
+ TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+
+ DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0]));
+ TezEvent tezEvent = new TezEvent(daEvent,
+ new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
+ dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+ dispatcher.await();
+
+ Assert.assertEquals(VertexState.FAILED, v2.getState());
+ Assert.assertEquals(VertexState.KILLED, v1.getState());
+ String diag = StringUtils.join(v2.getDiagnostics(), ",");
+ Assert.assertTrue(diag.contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testEdgeManager_RouteInputSourceTaskFailedEventToDestinationLegacyRouting() {
+ // Remove after legacy routing is removed
setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
@@ -1048,6 +1083,8 @@ public class TestDAGImpl {
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+ v1.useOnDemandRouting = false;
+ v2.useOnDemandRouting = false;
dispatcher.await();
Task t1= v2.getTask(0);
@@ -1057,13 +1094,15 @@ public class TestDAGImpl {
new EventMetaData(EventProducerConsumerType.INPUT,"vertex1", "vertex2", ta1.getID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
- //
+ v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+ dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
+
Assert.assertEquals(VertexState.KILLED, v1.getState());
String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination.name()));
}
-
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_GetNumDestinationConsumerTasks() {
@@ -1773,7 +1812,7 @@ public class TestDAGImpl {
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
}
- public static class CustomizedEdgeManager extends EdgeManagerPlugin {
+ public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
public static enum ExceptionLocation {
Initialize,
@@ -1861,6 +1900,47 @@ public class TestDAGImpl {
}
return 0;
}
+
+ @Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex,
+ int destinationFailedInputIndex) throws Exception {
+ if (exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
+ throw new Exception(exLocation.name());
+ }
+ return 0;
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex,
+ int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+ if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
+ throw new Exception(exLocation.name());
+ }
+ return null;
+ }
+
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
+ throw new Exception(exLocation.name());
+ }
+ return null;
+ }
+
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+ if (exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
+ throw new Exception(exLocation.name());
+ }
+ return null;
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 99ec6cf..a8eaca1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -73,6 +73,8 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
@@ -107,6 +109,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
@@ -148,7 +151,6 @@ import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -186,8 +188,6 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
@@ -2462,6 +2462,101 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
}
+
+ @Test (timeout = 5000)
+ public void testVertexGetTAAttempts() throws Exception {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v1 = vertices.get("vertex1");
+ startVertex(v1);
+ VertexImpl v2 = vertices.get("vertex2");
+ startVertex(v2);
+ VertexImpl v3 = vertices.get("vertex3");
+ VertexImpl v4 = vertices.get("vertex4");
+
+ Assert.assertEquals(VertexState.RUNNING, v4.getState());
+ Assert.assertEquals(1, v4.sourceVertices.size());
+ Edge e = v4.sourceVertices.get(v3);
+ TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+ TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+
+ for (int i=0; i<5; ++i) {
+ v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+ new TezEvent(DataMovementEvent.create(0, null),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+ }
+ dispatcher.await();
+ // verify all events have been put in pending.
+ // this is not necessary after legacy routing has been removed
+ Assert.assertEquals(5, v4.pendingTaskEvents.size());
+ v4.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+ // verify all events have been moved to taskEvents
+ Assert.assertEquals(5, v4.getOnDemandRouteEvents().size());
+ for (int i=5; i<11; ++i) {
+ v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+ new TezEvent(DataMovementEvent.create(0, null),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+ }
+ dispatcher.await();
+ // verify all events have been are in taskEvents
+ Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+
+ TaskAttemptEventInfo eventInfo;
+ EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+ EventRouteMetadata mockRoute = EventRouteMetadata.create(1, new int[]{0});
+ e.edgeManager = mockPlugin;
+ // source task id will not match. all events will return null
+ when(mockPlugin.routeDataMovementEventToDestination(1, 0, 0)).thenReturn(mockRoute);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, 0, 1);
+ Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+ Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
+
+ int fromEventId = 0;
+ // source task id will match. all events will be returned
+ // max events is respected.
+ when(
+ mockPlugin.routeDataMovementEventToDestination(anyInt(),
+ anyInt(), anyInt())).thenReturn(mockRoute);
+ for (int i=0; i<11; ++i) {
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+ fromEventId = eventInfo.getNextFromEventId();
+ Assert.assertEquals((i+1), fromEventId);
+ Assert.assertEquals(1, eventInfo.getEvents().size());
+ }
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+ Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+ Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
+
+ // change max events to larger value. max events does not evenly divide total events
+ fromEventId = 0;
+ for (int i=1; i<=2; ++i) {
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ fromEventId = eventInfo.getNextFromEventId();
+ Assert.assertEquals((i*5), fromEventId);
+ Assert.assertEquals(5, eventInfo.getEvents().size());
+ }
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+ Assert.assertEquals(1, eventInfo.getEvents().size()); // remainder events
+
+ // return more events that dont evenly fit in max size
+ mockRoute = EventRouteMetadata.create(2, new int[]{0, 0});
+ when(
+ mockPlugin.routeDataMovementEventToDestination(anyInt(),
+ anyInt(), anyInt())).thenReturn(mockRoute);
+ fromEventId = 0;
+ int lastFromEventId = 0;
+ for (int i=1; i<=4; ++i) {
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ fromEventId = eventInfo.getNextFromEventId();
+ Assert.assertEquals((i%2 > 0 ? (lastFromEventId+=2) : (lastFromEventId+=3)), fromEventId);
+ Assert.assertEquals(5, eventInfo.getEvents().size());
+ }
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+ Assert.assertEquals(2, eventInfo.getEvents().size()); // remainder events
+ }
@Test(timeout = 5000)
public void testVertexReconfigurePlannedAfterInit() throws Exception {
@@ -2632,11 +2727,15 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testVertexPendingTaskEvents() {
+ public void testVertexPendingTaskEventsLegacyRouting() {
+ // Remove after bulk routing API is removed
initAllVertices(VertexState.INITED);
VertexImpl v3 = vertices.get("vertex3");
VertexImpl v2 = vertices.get("vertex2");
VertexImpl v1 = vertices.get("vertex1");
+ v1.useOnDemandRouting = false;
+ v2.useOnDemandRouting = false;
+ v3.useOnDemandRouting = false;
startVertex(v1);
@@ -4595,8 +4694,6 @@ public class TestVertexImpl {
dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
Assert.assertEquals(5, v1.getTotalTasks());
- // task events get buffered
- Assert.assertEquals(5, v1.pendingTaskEvents.size());
Assert.assertEquals(RootInputVertexManager.class.getName(), v1
.getVertexManager().getPlugin().getClass().getName());
for (int i=0; i < v1Hints.size(); ++i) {
@@ -4609,6 +4706,16 @@ public class TestVertexImpl {
Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
}
+ // fake scheduling start to trigger edge routing to begin
+ v1.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+ // check all tasks get their events
+ for (int i=0; i<v1.getTotalTasks(); ++i) {
+ Assert.assertEquals(
+ 1,
+ v1.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v1.getTask(i).getTaskId(), 0),
+ 0, 100).getEvents().size());
+ }
+
VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
@@ -4627,7 +4734,6 @@ public class TestVertexImpl {
dispatcher.getEventHandler().handle(
new VertexEventRouteEvent(v2.getVertexId(), events));
dispatcher.await();
- Assert.assertEquals(1, v2.pendingTaskEvents.size());
RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
@@ -4635,14 +4741,99 @@ public class TestVertexImpl {
dispatcher.await();
Assert.assertEquals(VertexState.INITED, v2.getState());
Assert.assertEquals(10, v2.getTotalTasks());
+ Assert.assertEquals(RootInputVertexManager.class.getName(), v2
+ .getVertexManager().getPlugin().getClass().getName());
+ for (int i=0; i < v2Hints.size(); ++i) {
+ Assert.assertEquals(v2Hints.get(i), v2.getTaskLocationHints()[i]);
+ }
+ Assert.assertEquals(true, initializerManager2.hasShutDown);
+
+ // fake scheduling start to trigger edge routing to begin
+ v2.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+ // check all tasks get their events
+ for (int i=0; i<v2.getTotalTasks(); ++i) {
+ Assert.assertEquals(
+ ((i==0) ? 2 : 1),
+ v2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v2.getTask(i).getTaskId(), 0),
+ 0, 100).getEvents().size());
+ }
+ for (int i = 0; i < 10; i++) {
+ List<InputSpec> inputSpecs = v1.getInputSpecList(i);
+ Assert.assertEquals(1, inputSpecs.size());
+ Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testVertexWithInitializerSuccessLegacyRouting() throws Exception {
+ // Remove after legacy routing is removed
+ useCustomInitializer = true;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
+ setupPostDagCreation();
+
+ VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
+ .get("vertex1");
+ v1.useOnDemandRouting = false;
+ dispatcher.getEventHandler().handle(
+ new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+ RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
+ List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
+ initializerManager1.completeInputInitialization(0, 5, v1Hints);
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITED, v1.getState());
+ Assert.assertEquals(5, v1.getTotalTasks());
+ Assert.assertEquals(RootInputVertexManager.class.getName(), v1
+ .getVertexManager().getPlugin().getClass().getName());
+ for (int i=0; i < v1Hints.size(); ++i) {
+ Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]);
+ }
+ Assert.assertEquals(true, initializerManager1.hasShutDown);
+ for (int i = 0; i < 5; i++) {
+ List<InputSpec> inputSpecs = v1.getInputSpecList(i);
+ Assert.assertEquals(1, inputSpecs.size());
+ Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
+ }
// task events get buffered
- Assert.assertEquals(11, v2.pendingTaskEvents.size());
+ Assert.assertEquals(5, v1.pendingTaskEvents.size());
+
+ VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
+ Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+ v2.useOnDemandRouting = false;
+
+ // non-task events don't get buffered
+ List<TezEvent> events = Lists.newLinkedList();
+ TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+ events.add(new TezEvent(
+ VertexManagerEvent.create("vertex2", ByteBuffer.wrap(new byte[0])), new EventMetaData(
+ EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2",
+ ta0_t0_v1)));
+ events.add(new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0,
+ ByteBuffer.wrap(new byte[0])),
+ new EventMetaData(EventProducerConsumerType.INPUT, "vertex2",
+ "NULL_VERTEX", null)));
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v2.getVertexId(), events));
+ dispatcher.await();
+ Assert.assertEquals(1, v2.pendingTaskEvents.size());
+
+ RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
+ List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
+ initializerManager2.completeInputInitialization(0, 10, v2Hints);
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITED, v2.getState());
+ Assert.assertEquals(10, v2.getTotalTasks());
Assert.assertEquals(RootInputVertexManager.class.getName(), v2
.getVertexManager().getPlugin().getClass().getName());
for (int i=0; i < v2Hints.size(); ++i) {
Assert.assertEquals(v2Hints.get(i), v2.getTaskLocationHints()[i]);
}
Assert.assertEquals(true, initializerManager2.hasShutDown);
+ // task events get buffered
+ Assert.assertEquals(11, v2.pendingTaskEvents.size());
for (int i = 0; i < 10; i++) {
List<InputSpec> inputSpecs = v1.getInputSpecList(i);
Assert.assertEquals(1, inputSpecs.size());
@@ -4650,6 +4841,7 @@ public class TestVertexImpl {
}
}
+
@Test(timeout = 5000)
public void testVertexWithInputDistributor() throws Exception {
useCustomInitializer = true;
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
index 09f9a20..9cb914f 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -21,13 +21,13 @@ package org.apache.tez.test;
import java.util.List;
import java.util.Map;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-public class EdgeManagerForTest extends EdgeManagerPlugin {
+public class EdgeManagerForTest extends EdgeManagerPluginOnDemand {
private UserPayload userPayload;
@@ -78,6 +78,35 @@ public class EdgeManagerForTest extends EdgeManagerPlugin {
public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
}
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex,
+ int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+ return null;
+ }
+
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ return null;
+ }
+
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex)
+ throws Exception {
+ return 0;
+ }
// End of overridden methods
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index f8b8621..921095c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -49,6 +49,7 @@ public abstract class RuntimeTask {
protected final Configuration tezConf;
protected final TezUmbilical tezUmbilical;
protected final AtomicInteger eventCounter;
+ protected final AtomicInteger nextFromEventId;
private final AtomicBoolean taskDone;
private final TaskCounterUpdater counterUpdater;
private final TaskStatistics statistics;
@@ -60,6 +61,7 @@ public abstract class RuntimeTask {
this.tezUmbilical = tezUmbilical;
this.tezCounters = new TezCounters();
this.eventCounter = new AtomicInteger(0);
+ this.nextFromEventId = new AtomicInteger(0);
this.progress = 0.0f;
this.taskDone = new AtomicBoolean(false);
this.statistics = new TaskStatistics();
@@ -130,6 +132,14 @@ public abstract class RuntimeTask {
public int getEventCounter() {
return eventCounter.get();
}
+
+ public int getNextFromEventId() {
+ return nextFromEventId.get();
+ }
+
+ public void setNextFromEventId(int nextFromEventId) {
+ this.nextFromEventId.set(nextFromEventId);
+ }
public boolean isTaskDone() {
return taskDone.get();
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
index 10699ac..cecc706 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
@@ -32,6 +32,7 @@ public class TezHeartbeatResponse implements Writable {
private long lastRequestId;
private boolean shouldDie = false;
private List<TezEvent> events;
+ private int nextFromEventId;
public TezHeartbeatResponse() {
}
@@ -51,6 +52,10 @@ public class TezHeartbeatResponse implements Writable {
public long getLastRequestId() {
return lastRequestId;
}
+
+ public int getNextFromEventId() {
+ return nextFromEventId;
+ }
public void setEvents(List<TezEvent> events) {
this.events = Collections.unmodifiableList(events);
@@ -63,11 +68,16 @@ public class TezHeartbeatResponse implements Writable {
public void setShouldDie() {
this.shouldDie = true;
}
+
+ public void setNextFromEventId(int nextFromEventId) {
+ this.nextFromEventId = nextFromEventId;
+ }
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(lastRequestId);
out.writeBoolean(shouldDie);
+ out.writeInt(nextFromEventId);
if(events != null) {
out.writeBoolean(true);
out.writeInt(events.size());
@@ -83,6 +93,7 @@ public class TezHeartbeatResponse implements Writable {
public void readFields(DataInput in) throws IOException {
lastRequestId = in.readLong();
shouldDie = in.readBoolean();
+ nextFromEventId = in.readInt();
if(in.readBoolean()) {
int eventCount = in.readInt();
events = new ArrayList<TezEvent>(eventCount);
@@ -99,6 +110,7 @@ public class TezHeartbeatResponse implements Writable {
return "{ "
+ " lastRequestId=" + lastRequestId
+ ", shouldDie=" + shouldDie
+ + ", nextFromEventId=" + nextFromEventId
+ ", eventCount=" + (events != null ? events.size() : 0)
+ " }";
}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 7324abd..3d1d1a2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -240,8 +240,9 @@ public class TaskReporter {
}
long requestId = requestCounter.incrementAndGet();
+ int fromEventId = task.getNextFromEventId();
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
- task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+ task.getTaskAttemptID(), fromEventId, maxEventsToGet);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
@@ -271,11 +272,12 @@ public class TaskReporter {
+ " heartbeat response, eventCount=" + response.getEvents().size());
}
} else {
+ task.setNextFromEventId(response.getNextFromEventId());
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
- + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
- }
+ LOG.info("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+ + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()
+ + " fromEventId=" + fromEventId
+ + " nextFromEventId=" + response.getNextFromEventId());
// This should ideally happen in a separate thread
numEventsReceived = response.getEvents().size();
task.handleEvents(response.getEvents());