You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/22 08:29:35 UTC
tez git commit: TEZ-2409. Allow different edges to have different
routing plugins (bikas) (cherry picked from commit
41173aa0a0cc60924902ce9e78a452e2069a2a31)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 d5c5b7177 -> 533979600
TEZ-2409. Allow different edges to have different routing plugins (bikas)
(cherry picked from commit 41173aa0a0cc60924902ce9e78a452e2069a2a31)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/53397960
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/53397960
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/53397960
Branch: refs/heads/branch-0.7
Commit: 533979600ec4bd4e11a84a72385d14f36ae2dd6c
Parents: d5c5b71
Author: Bikas Saha <bi...@apache.org>
Authored: Thu May 21 23:26:25 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu May 21 23:28:39 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/TaskAttemptEventInfo.java | 8 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 3 +-
.../java/org/apache/tez/dag/app/dag/Task.java | 3 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 8 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 9 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 93 +++++++----------
.../apache/tez/dag/app/MockDAGAppMaster.java | 9 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 23 ++---
.../app/TestTaskAttemptListenerImplTezDag.java | 4 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 100 ++++++++++++++-----
.../tez/dag/app/dag/impl/TestVertexImpl.java | 52 ++++++----
.../org/apache/tez/runtime/RuntimeTask.java | 10 ++
.../runtime/api/impl/TezHeartbeatRequest.java | 13 ++-
.../runtime/api/impl/TezHeartbeatResponse.java | 12 +++
.../apache/tez/runtime/task/TaskReporter.java | 6 +-
17 files changed, 221 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d847608..0bf195a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@ ALL CHANGES:
TEZ-2455. Tez UI: Dag view caching, error handling and minor layout changes
TEZ-2453. Tez UI: show the dagInfo is the application has set the same.
TEZ-2447. Tez UI: Generic changes based on feedbacks.
+ TEZ-2409. Allow different edges to have different routing plugins
Release 0.7.0: 2015-05-18
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
index 49ff044..d6b03e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
@@ -23,10 +23,12 @@ import org.apache.tez.runtime.api.impl.TezEvent;
public class TaskAttemptEventInfo {
private final int nextFromEventId;
+ private final int nextPreRoutedFromEventId;
private final List<TezEvent> events;
- public TaskAttemptEventInfo(int nextFromEventId, List<TezEvent> events) {
+ public TaskAttemptEventInfo(int nextFromEventId, List<TezEvent> events, int nextPreRoutedEventId) {
this.nextFromEventId = nextFromEventId;
+ this.nextPreRoutedFromEventId = nextPreRoutedEventId;
this.events = events;
}
@@ -34,6 +36,10 @@ public class TaskAttemptEventInfo {
return nextFromEventId;
}
+ public int getNextPreRoutedFromEventId() {
+ return nextPreRoutedFromEventId;
+ }
+
public List<TezEvent> getEvents() {
return events;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 970489d..2bf7de3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -448,9 +448,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
- request.getMaxEvents());
+ request.getPreRoutedStartIndex(), request.getMaxEvents());
response.setEvents(eventInfo.getEvents());
response.setNextFromEventId(eventInfo.getNextFromEventId());
+ response.setNextPreRoutedEventId(eventInfo.getNextPreRoutedFromEventId());
}
containerInfo.lastRequestId = requestId;
containerInfo.lastReponse = response;
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 177ee8a..47b56f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.dag;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -59,7 +60,7 @@ public interface Task {
public Vertex getVertex();
- public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+ public ArrayList<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
int fromEventId, int maxEvents);
public List<String> getDiagnostics();
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index bb42392..bb3548d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -147,7 +147,7 @@ public interface Vertex extends Comparable<Vertex> {
Resource getTaskResource();
public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
- int fromEventId, int maxEvents);
+ int fromEventId, int nextPreRoutedFromEventId, int maxEvents);
void handleSpeculatorEvent(SpeculatorEvent event);
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 78bab05..f9cbede 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -197,7 +197,7 @@ public class Edge {
setEdgeProperty(modifiedEdgeProperty);
}
- public synchronized boolean routingToBegin() throws AMUserCodeException {
+ public synchronized void routingToBegin() throws AMUserCodeException {
if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
routingNeeded = false;
} else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
@@ -216,7 +216,11 @@ public class Edge {
}
}
- LOG.info("Routing to begin for edge: " + getEdgeInfo() + ". EdgeProperty: " + edgeProperty);
+ LOG.info("Routing to begin for edge: " + getEdgeInfo() + ". EdgeProperty: " + edgeProperty +
+ " onDemandRouting: " + hasOnDemandRouting());
+ }
+
+ public synchronized boolean hasOnDemandRouting() {
return onDemandRouting;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index de5ab2a..b2eb81e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -138,7 +138,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
final StateChangeNotifier stateChangeNotifier;
private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
- private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
+ static final ArrayList<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
new ArrayList(0);
// track the status of TaskAttempt (true mean completed, false mean uncompleted)
@@ -485,9 +485,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
@Override
- public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+ public ArrayList<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
int fromEventId, int maxEvents) {
- List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
+ ArrayList<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
readLock.lock();
try {
@@ -500,8 +500,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
int actualMax = Math.min(maxEvents,
(tezEventsForTaskAttempts.size() - fromEventId));
int toEventId = actualMax + fromEventId;
- events = Collections.unmodifiableList(new ArrayList<TezEvent>(
- tezEventsForTaskAttempts.subList(fromEventId, toEventId)));
+ events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId));
LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
+ "-" + toEventId + ")");
// currently not modifying the events so that we dont have to create
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e20ee6e..e909c9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -228,9 +228,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private Configuration vertexConf;
private final boolean isSpeculationEnabled;
-
- @VisibleForTesting
- public boolean useOnDemandRouting = true;
//fields initialized in init
@@ -742,8 +739,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock();
private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock();
- private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
- new ArrayList(0);
List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
@@ -1419,26 +1414,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
- boolean setupEdgeRouting() throws AMUserCodeException {
- boolean doOnDemand = useOnDemandRouting;
+ void setupEdgeRouting() throws AMUserCodeException {
for (Edge e : sourceVertices.values()) {
- boolean edgeDoingOnDemand = e.routingToBegin();
- if (doOnDemand && !edgeDoingOnDemand) {
- doOnDemand = false;
- LOG.info("Not using ondemand routing because of edge between " + e.getSourceVertexName()
- + " and " + getLogIdentifier());
- }
+ e.routingToBegin();
}
- return doOnDemand;
}
private void unsetTasksNotYetScheduled() throws AMUserCodeException {
if (tasksNotYetScheduled) {
- boolean doOnDemand = setupEdgeRouting();
+ setupEdgeRouting();
// change state under lock
writeLock.lock();
try {
- useOnDemandRouting = doOnDemand;
tasksNotYetScheduled = false;
// only now can we be sure of the edge manager type. so until now
// we will accumulate pending tasks in case legacy routing gets used.
@@ -4053,20 +4040,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
- int fromEventId, int maxEvents) {
- if (!useOnDemandRouting) {
- List<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(attemptID, fromEventId, maxEvents);
- return new TaskAttemptEventInfo(fromEventId + events.size(), events);
- }
-
+ int fromEventId, int preRoutedFromEventId, int maxEvents) {
+ ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(
+ attemptID, preRoutedFromEventId, maxEvents);
+ int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
+ int nextFromEventId = fromEventId;
onDemandRouteEventsReadLock.lock();
try {
- List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
- int nextFromEventId = fromEventId;
int currEventCount = onDemandRouteEvents.size();
try {
if (currEventCount > fromEventId) {
- events = Lists.newArrayListWithCapacity(maxEvents);
+ if (events != TaskImpl.EMPTY_TASK_ATTEMPT_TEZ_EVENTS) {
+ events.ensureCapacity(maxEvents);
+ } else {
+ events = Lists.newArrayListWithCapacity(maxEvents);
+ }
+ int numPreRoutedEvents = events.size();
int taskIndex = attemptID.getTaskID().getId();
Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID
+ " vertex: " + getLogIdentifier());
@@ -4118,6 +4107,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
break;
}
}
+ int numEventsSent = events.size() - numPreRoutedEvents;
+ if (numEventsSent > 0) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent)
+ .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
+ .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ")
+ .append(getLogIdentifier());
+ LOG.info(builder.toString());
+ }
}
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
@@ -4126,18 +4124,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
nextFromEventId = fromEventId;
events.clear();
}
-
- if (events.size() > 0) {
- StringBuilder builder = new StringBuilder();
- builder.append("Sending ").append(attemptID).append(" numEvents: ").append(events.size())
- .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
- .append(" out of ").append(currEventCount).append(" events in vertex: ").append(getLogIdentifier());
- LOG.info(builder.toString());
- }
- return new TaskAttemptEventInfo(nextFromEventId, events);
} finally {
onDemandRouteEventsReadLock.unlock();
}
+ return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
}
private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
@@ -4206,15 +4196,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
pendingTaskEvents.add(tezEvent);
} else {
// event not from this vertex. must have come from source vertex.
- if (useOnDemandRouting) {
- int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId();
- Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
- Edge srcEdge = sourceVertices.get(edgeVertex);
- if (srcEdge == null) {
- throw new TezUncheckedException("Bad source vertex: " +
- sourceMeta.getTaskVertexName() + " for destination vertex: " +
- getLogIdentifier());
- }
+ int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId();
+ Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
+ Edge srcEdge = sourceVertices.get(edgeVertex);
+ if (srcEdge == null) {
+ throw new TezUncheckedException("Bad source vertex: " +
+ sourceMeta.getTaskVertexName() + " for destination vertex: " +
+ getLogIdentifier());
+ }
+ if (srcEdge.hasOnDemandRouting()) {
onDemandRouteEventsWriteLock.lock();
try {
onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
@@ -4223,13 +4213,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
} else {
// send to tasks
- Edge srcEdge = sourceVertices.get(getDAG().getVertex(
- sourceMeta.getTaskVertexName()));
- if (srcEdge == null) {
- throw new TezUncheckedException("Bad source vertex: "
- + sourceMeta.getTaskVertexName() + " for destination vertex: "
- + getLogIdentifier());
- }
srcEdge.sendTezEventToDestinationTasks(tezEvent);
}
}
@@ -4246,13 +4229,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
// removed.
pendingTaskEvents.add(tezEvent);
} else {
- if (useOnDemandRouting) {
- onDemandRouteEvents.add(new EventInfo(tezEvent, null, -1));
- } else {
- InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
- Task targetTask = getTask(riEvent.getTargetIndex());
- targetTask.registerTezEvent(tezEvent);
- }
+ InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
+ Task targetTask = getTask(riEvent.getTargetIndex());
+ targetTask.registerTezEvent(tezEvent);
}
}
break;
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 5cd487c..08f6ff6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -162,6 +162,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
ContainerLaunchContext launchContext;
int numUpdates = 0;
int nextFromEventId = 0;
+ int nextPreRoutedFromEventId = 0;
boolean completed;
String cIdStr;
AtomicBoolean remove = new AtomicBoolean(false);
@@ -184,6 +185,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
launchContext = null;
numUpdates = 0;
nextFromEventId = 0;
+ nextPreRoutedFromEventId = 0;
cIdStr = null;
remove.set(false);
}
@@ -338,7 +340,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
if (response.shouldDie()) {
cData.remove();
} else {
- cData.nextFromEventId += response.getNextFromEventId();
+ cData.nextFromEventId = response.getNextFromEventId();
+ cData.nextPreRoutedFromEventId = response.getNextPreRoutedEventId();
if (!response.getEvents().isEmpty()) {
long stopTime = System.nanoTime();
long stopCpuTime = threadMxBean.getCurrentThreadCpuTime();
@@ -421,7 +424,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
- cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
+ cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
doHeartbeat(request, cData);
} else if (version != null && cData.taId.getId() <= version.intValue()) {
preemptContainer(cData);
@@ -432,7 +435,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
new TaskAttemptCompletedEvent(), new EventMetaData(
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
- cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
+ cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
doHeartbeat(request, cData);
cData.clear();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 1e7faf9..4137d42 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -253,8 +253,8 @@ public class TestMockDAGAppMaster {
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vB.getName());
TaskImpl tImpl = (TaskImpl) vImpl.getTask(1);
- TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
- List<TezEvent> tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+ List<TezEvent> tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -267,8 +267,8 @@ public class TestMockDAGAppMaster {
(targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
- taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
- tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
+ taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+ tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -281,8 +281,8 @@ public class TestMockDAGAppMaster {
(targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
- taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
- tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
+ taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+ tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
Assert.assertEquals(1, tEvents.size()); // 1 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
@@ -390,22 +390,19 @@ public class TestMockDAGAppMaster {
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
// vC uses on demand routing and its task does not provide events
VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
- Assert.assertEquals(true, vImpl.useOnDemandRouting);
TaskImpl tImpl = (TaskImpl) vImpl.getTask(0);
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
Assert.assertEquals(0, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
- // vD is mixed more and does not use on demand routing and its task provides events
+ // vD is mixed mode and only 1 out of 2 edges does legacy routing with task providing events
vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
- Assert.assertEquals(false, vImpl.useOnDemandRouting);
tImpl = (TaskImpl) vImpl.getTask(0);
taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
- Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+ Assert.assertEquals(1, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
// vE has single legacy edge and does not use on demand routing and its task provides events
- vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
- Assert.assertEquals(false, vImpl.useOnDemandRouting);
+ vImpl = (VertexImpl) dagImpl.getVertex(vE.getName());
tImpl = (TaskImpl) vImpl.getTask(0);
taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
- Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+ Assert.assertEquals(1, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
tezClient.stop();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index db8eff1..ac816f4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -243,8 +243,8 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(vertex).when(dag).getVertex(vertexID);
doReturn("test_vertex").when(vertex).getName();
- TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents);
- doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, maxEvents);
+ TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
+ doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
taskAttemptListener.registerRunningContainer(containerId);
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index fff95b5..4787247 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
@@ -115,7 +116,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.impl.TestDAGImpl.CustomizedEdgeManager.ExceptionLocation;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
@@ -670,8 +670,8 @@ public class TestDAGImpl {
}
// v1 -> v2
- private DAGPlan createDAGWithCustomEdge(ExceptionLocation exLocation) {
- LOG.info("Setting up dag plan");
+ private DAGPlan createDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
+ LOG.info("Setting up custome edge dag plan " + exLocation + " " + useLegacy);
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
.addVertex(
@@ -723,7 +723,8 @@ public class TestDAGImpl {
.addEdge(
EdgePlan.newBuilder()
.setEdgeManager(TezEntityDescriptorProto.newBuilder()
- .setClassName(CustomizedEdgeManager.class.getName())
+ .setClassName(useLegacy ? CustomizedEdgeManagerLegacy.class.getName() :
+ CustomizedEdgeManager.class.getName())
.setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
.setUserPayload(ByteString.copyFromUtf8(exLocation.name())))
)
@@ -866,8 +867,12 @@ public class TestDAGImpl {
}
private void setupDAGWithCustomEdge(ExceptionLocation exLocation) {
+ setupDAGWithCustomEdge(exLocation, false);
+ }
+
+ private void setupDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
dagWithCustomEdgeId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 4);
- dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation);
+ dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation, useLegacy);
dagWithCustomEdgeAppContext = mock(AppContext.class);
doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager();
dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge,
@@ -1018,7 +1023,7 @@ public class TestDAGImpl {
new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
- v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+ v2.getTaskAttemptTezEvents(ta1.getID(), 0, 0, 1000);
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
@@ -1031,7 +1036,7 @@ public class TestDAGImpl {
@Test(timeout = 5000)
public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting() {
// Remove after legacy routing is removed
- setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
+ setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
@@ -1041,8 +1046,7 @@ public class TestDAGImpl {
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
- v1.useOnDemandRouting = false;
- v2.useOnDemandRouting = false;
+
dispatcher.await();
Task t1= v2.getTask(0);
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
@@ -1063,7 +1067,7 @@ public class TestDAGImpl {
@Test(timeout = 5000)
public void testEdgeManager_RouteInputSourceTaskFailedEventToDestinationLegacyRouting() {
// Remove after legacy routing is removed
- setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
+ setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination, true);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
@@ -1073,8 +1077,6 @@ public class TestDAGImpl {
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
- v1.useOnDemandRouting = false;
- v2.useOnDemandRouting = false;
dispatcher.await();
Task t1= v2.getTask(0);
@@ -1084,7 +1086,7 @@ public class TestDAGImpl {
new EventMetaData(EventProducerConsumerType.INPUT,"vertex1", "vertex2", ta1.getID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
- v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+ v2.getTaskAttemptTezEvents(ta1.getID(), 0, 0, 1000);
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
@@ -1802,18 +1804,17 @@ public class TestDAGImpl {
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
}
- public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
-
- public static enum ExceptionLocation {
- Initialize,
- GetNumDestinationTaskPhysicalInputs,
- GetNumSourceTaskPhysicalOutputs,
- RouteDataMovementEventToDestination,
- RouteInputSourceTaskFailedEventToDestination,
- GetNumDestinationConsumerTasks,
- RouteInputErrorEventToSource
- }
+ public static enum ExceptionLocation {
+ Initialize,
+ GetNumDestinationTaskPhysicalInputs,
+ GetNumSourceTaskPhysicalOutputs,
+ RouteDataMovementEventToDestination,
+ RouteInputSourceTaskFailedEventToDestination,
+ GetNumDestinationConsumerTasks,
+ RouteInputErrorEventToSource
+ }
+ public static class CustomizedEdgeManagerLegacy extends EdgeManagerPlugin {
private ExceptionLocation exLocation;
public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
@@ -1821,7 +1822,7 @@ public class TestDAGImpl {
.setUserPayload(UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes())));
}
- public CustomizedEdgeManager(EdgeManagerPluginContext context) {
+ public CustomizedEdgeManagerLegacy(EdgeManagerPluginContext context) {
super(context);
this.exLocation = ExceptionLocation.valueOf(
new String(context.getUserPayload().deepCopyAsArray()));
@@ -1890,6 +1891,55 @@ public class TestDAGImpl {
}
return 0;
}
+ }
+
+ public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
+ private ExceptionLocation exLocation;
+
+ public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
+ return EdgeManagerPluginDescriptor.create(CustomizedEdgeManager.class.getName())
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes())));
+ }
+
+ public CustomizedEdgeManager(EdgeManagerPluginContext context) {
+ super(context);
+ this.exLocation = ExceptionLocation.valueOf(
+ new String(context.getUserPayload().deepCopyAsArray()));
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ if (exLocation == ExceptionLocation.Initialize) {
+ throw new Exception(exLocation.name());
+ }
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex)
+ throws Exception {
+ if (exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
+ throw new Exception(exLocation.name());
+ }
+ return 0;
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex)
+ throws Exception {
+ if (exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
+ throw new Exception(exLocation.name());
+ }
+ return 0;
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex)
+ throws Exception {
+ if (exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
+ throw new Exception(exLocation.name());
+ }
+ return 0;
+ }
@Override
public int routeInputErrorEventToSource(int destinationTaskIndex,
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6c94465..e569949 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2493,7 +2493,13 @@ public class TestVertexImpl {
// verify all events have been put in pending.
// this is not necessary after legacy routing has been removed
Assert.assertEquals(5, v4.pendingTaskEvents.size());
- v4.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+ List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ // scheduling start to trigger edge routing to begin
+ for (int i=0; i<v4.getTotalTasks(); ++i) {
+ taskList.add(new TaskWithLocationHint(i, null));
+ }
+ v4.scheduleTasks(taskList);
+ dispatcher.await();
// verify all events have been moved to taskEvents
Assert.assertEquals(5, v4.getOnDemandRouteEvents().size());
for (int i=5; i<11; ++i) {
@@ -2511,7 +2517,7 @@ public class TestVertexImpl {
e.edgeManager = mockPlugin;
// source task id will not match. all events will return null
when(mockPlugin.routeDataMovementEventToDestination(1, 0, 0)).thenReturn(mockRoute);
- eventInfo = v4.getTaskAttemptTezEvents(v4TaId, 0, 1);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, 0, 0, 1);
Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
@@ -2522,24 +2528,24 @@ public class TestVertexImpl {
mockPlugin.routeDataMovementEventToDestination(anyInt(),
anyInt(), anyInt())).thenReturn(mockRoute);
for (int i=0; i<11; ++i) {
- eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 1);
fromEventId = eventInfo.getNextFromEventId();
Assert.assertEquals((i+1), fromEventId);
Assert.assertEquals(1, eventInfo.getEvents().size());
}
- eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 1);
Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
// change max events to larger value. max events does not evenly divide total events
fromEventId = 0;
for (int i=1; i<=2; ++i) {
- eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
fromEventId = eventInfo.getNextFromEventId();
Assert.assertEquals((i*5), fromEventId);
Assert.assertEquals(5, eventInfo.getEvents().size());
}
- eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
Assert.assertEquals(1, eventInfo.getEvents().size()); // remainder events
@@ -2551,12 +2557,12 @@ public class TestVertexImpl {
fromEventId = 0;
int lastFromEventId = 0;
for (int i=1; i<=4; ++i) {
- eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
fromEventId = eventInfo.getNextFromEventId();
Assert.assertEquals((i%2 > 0 ? (lastFromEventId+=2) : (lastFromEventId+=3)), fromEventId);
Assert.assertEquals(5, eventInfo.getEvents().size());
}
- eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+ eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
Assert.assertEquals(2, eventInfo.getEvents().size()); // remainder events
}
@@ -2754,15 +2760,12 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testVertexPendingTaskEventsLegacyRouting() {
+ public void testVertexPendingTaskEvents() {
// Remove after bulk routing API is removed
initAllVertices(VertexState.INITED);
VertexImpl v3 = vertices.get("vertex3");
VertexImpl v2 = vertices.get("vertex2");
VertexImpl v1 = vertices.get("vertex1");
- v1.useOnDemandRouting = false;
- v2.useOnDemandRouting = false;
- v3.useOnDemandRouting = false;
startVertex(v1);
@@ -4733,14 +4736,19 @@ public class TestVertexImpl {
Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
}
- // fake scheduling start to trigger edge routing to begin
- v1.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+ List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ // scheduling start to trigger edge routing to begin
+ for (int i=0; i<v1.getTotalTasks(); ++i) {
+ taskList.add(new TaskWithLocationHint(i, null));
+ }
+ v1.scheduleTasks(taskList);
+ dispatcher.await();
// check all tasks get their events
for (int i=0; i<v1.getTotalTasks(); ++i) {
Assert.assertEquals(
1,
v1.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v1.getTask(i).getTaskId(), 0),
- 0, 100).getEvents().size());
+ 0, 0, 100).getEvents().size());
}
VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
@@ -4775,14 +4783,20 @@ public class TestVertexImpl {
}
Assert.assertEquals(true, initializerManager2.hasShutDown);
- // fake scheduling start to trigger edge routing to begin
- v2.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+ // scheduling start to trigger edge routing to begin
+ taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ // scheduling start to trigger edge routing to begin
+ for (int i=0; i<v2.getTotalTasks(); ++i) {
+ taskList.add(new TaskWithLocationHint(i, null));
+ }
+ v2.scheduleTasks(taskList);
+ dispatcher.await();
// check all tasks get their events
for (int i=0; i<v2.getTotalTasks(); ++i) {
Assert.assertEquals(
((i==0) ? 2 : 1),
v2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v2.getTask(i).getTaskId(), 0),
- 0, 100).getEvents().size());
+ 0, 0, 100).getEvents().size());
}
for (int i = 0; i < 10; i++) {
List<InputSpec> inputSpecs = v1.getInputSpecList(i);
@@ -4801,7 +4815,6 @@ public class TestVertexImpl {
VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
.get("vertex1");
- v1.useOnDemandRouting = false;
dispatcher.getEventHandler().handle(
new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
dispatcher.await();
@@ -4828,7 +4841,6 @@ public class TestVertexImpl {
VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
- v2.useOnDemandRouting = false;
// non-task events don't get buffered
List<TezEvent> events = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 921095c..17d7053 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -50,6 +50,7 @@ public abstract class RuntimeTask {
protected final TezUmbilical tezUmbilical;
protected final AtomicInteger eventCounter;
protected final AtomicInteger nextFromEventId;
+ protected final AtomicInteger nextPreRoutedEventId;
private final AtomicBoolean taskDone;
private final TaskCounterUpdater counterUpdater;
private final TaskStatistics statistics;
@@ -62,6 +63,7 @@ public abstract class RuntimeTask {
this.tezCounters = new TezCounters();
this.eventCounter = new AtomicInteger(0);
this.nextFromEventId = new AtomicInteger(0);
+ this.nextPreRoutedEventId = new AtomicInteger(0);
this.progress = 0.0f;
this.taskDone = new AtomicBoolean(false);
this.statistics = new TaskStatistics();
@@ -137,9 +139,17 @@ public abstract class RuntimeTask {
return nextFromEventId.get();
}
+ public int getNextPreRoutedEventId() {
+ return nextPreRoutedEventId.get();
+ }
+
public void setNextFromEventId(int nextFromEventId) {
this.nextFromEventId.set(nextFromEventId);
}
+
+ public void setNextPreRoutedEventId(int nextPreRoutedEventId) {
+ this.nextPreRoutedEventId.set(nextPreRoutedEventId);
+ }
public boolean isTaskDone() {
return taskDone.get();
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
index 3baef93..7ed89f8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
@@ -36,6 +36,7 @@ public class TezHeartbeatRequest implements Writable {
private List<TezEvent> events;
private TezTaskAttemptID currentTaskAttemptID;
private int startIndex;
+ private int preRoutedStartIndex;
private int maxEvents;
private long requestId;
@@ -43,12 +44,13 @@ public class TezHeartbeatRequest implements Writable {
}
public TezHeartbeatRequest(long requestId, List<TezEvent> events,
- String containerIdentifier, TezTaskAttemptID taskAttemptID,
- int startIndex, int maxEvents) {
+ int preRoutedStartIndex, String containerIdentifier,
+ TezTaskAttemptID taskAttemptID, int startIndex, int maxEvents) {
this.containerIdentifier = containerIdentifier;
this.requestId = requestId;
this.events = Collections.unmodifiableList(events);
this.startIndex = startIndex;
+ this.preRoutedStartIndex = preRoutedStartIndex;
this.maxEvents = maxEvents;
this.currentTaskAttemptID = taskAttemptID;
}
@@ -65,6 +67,10 @@ public class TezHeartbeatRequest implements Writable {
return startIndex;
}
+ public int getPreRoutedStartIndex() {
+ return preRoutedStartIndex;
+ }
+
public int getMaxEvents() {
return maxEvents;
}
@@ -95,6 +101,7 @@ public class TezHeartbeatRequest implements Writable {
out.writeBoolean(false);
}
out.writeInt(startIndex);
+ out.writeInt(preRoutedStartIndex);
out.writeInt(maxEvents);
out.writeLong(requestId);
Text.writeString(out, containerIdentifier);
@@ -117,6 +124,7 @@ public class TezHeartbeatRequest implements Writable {
currentTaskAttemptID = null;
}
startIndex = in.readInt();
+ preRoutedStartIndex = in.readInt();
maxEvents = in.readInt();
requestId = in.readLong();
containerIdentifier = Text.readString(in);
@@ -128,6 +136,7 @@ public class TezHeartbeatRequest implements Writable {
+ " containerId=" + containerIdentifier
+ ", requestId=" + requestId
+ ", startIndex=" + startIndex
+ + ", preRoutedStartIndex=" + preRoutedStartIndex
+ ", maxEventsToGet=" + maxEvents
+ ", taskAttemptId=" + currentTaskAttemptID
+ ", eventCount=" + (events != null ? events.size() : 0)
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
index cecc706..0aa4db4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
@@ -33,6 +33,7 @@ public class TezHeartbeatResponse implements Writable {
private boolean shouldDie = false;
private List<TezEvent> events;
private int nextFromEventId;
+ private int nextPreRoutedEventId;
public TezHeartbeatResponse() {
}
@@ -57,6 +58,10 @@ public class TezHeartbeatResponse implements Writable {
return nextFromEventId;
}
+ public int getNextPreRoutedEventId() {
+ return nextPreRoutedEventId;
+ }
+
public void setEvents(List<TezEvent> events) {
this.events = Collections.unmodifiableList(events);
}
@@ -73,11 +78,16 @@ public class TezHeartbeatResponse implements Writable {
this.nextFromEventId = nextFromEventId;
}
+ public void setNextPreRoutedEventId(int nextPreRoutedEventId) {
+ this.nextPreRoutedEventId = nextPreRoutedEventId;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(lastRequestId);
out.writeBoolean(shouldDie);
out.writeInt(nextFromEventId);
+ out.writeInt(nextPreRoutedEventId);
if(events != null) {
out.writeBoolean(true);
out.writeInt(events.size());
@@ -94,6 +104,7 @@ public class TezHeartbeatResponse implements Writable {
lastRequestId = in.readLong();
shouldDie = in.readBoolean();
nextFromEventId = in.readInt();
+ nextPreRoutedEventId = in.readInt();
if(in.readBoolean()) {
int eventCount = in.readInt();
events = new ArrayList<TezEvent>(eventCount);
@@ -111,6 +122,7 @@ public class TezHeartbeatResponse implements Writable {
+ " lastRequestId=" + lastRequestId
+ ", shouldDie=" + shouldDie
+ ", nextFromEventId=" + nextFromEventId
+ + ", nextPreRoutedEventId=" + nextPreRoutedEventId
+ ", eventCount=" + (events != null ? events.size() : 0)
+ " }";
}
http://git-wip-us.apache.org/repos/asf/tez/blob/53397960/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 8b9db16..d9a7786 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -249,8 +249,9 @@ public class TaskReporter {
long requestId = requestCounter.incrementAndGet();
int fromEventId = task.getNextFromEventId();
- TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
- task.getTaskAttemptID(), fromEventId, maxEventsToGet);
+ int fromPreRoutedEventId = task.getNextPreRoutedEventId();
+ TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
+ containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
@@ -282,6 +283,7 @@ public class TaskReporter {
}
} else {
task.setNextFromEventId(response.getNextFromEventId());
+ task.setNextPreRoutedEventId(response.getNextPreRoutedEventId());
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
LOG.info("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+ task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()