You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/27 00:07:58 UTC
git commit: TEZ-715. Auto Reduce Parallelism can rarely trigger NPE
in AM at DAGAppMaster.handle(DAGAppMaster.java:1268) (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master f2f31e0ef -> a9bace052
TEZ-715. Auto Reduce Parallelism can rarely trigger NPE in AM at DAGAppMaster.handle(DAGAppMaster.java:1268) (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a9bace05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a9bace05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a9bace05
Branch: refs/heads/master
Commit: a9bace052a9dba4b1aa08c2e9310df73928dad84
Parents: f2f31e0
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 26 15:07:30 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 26 15:07:30 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/tez/dag/app/dag/Task.java | 2 -
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 12 ----
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 62 +++++++++-----------
.../tez/dag/app/dag/impl/TestVertexImpl.java | 42 +++++++++++++
4 files changed, 69 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 1d25930..8243b70 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -78,8 +78,6 @@ public interface Task {
public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
int fromEventId, int maxEvents);
- public List<TezEvent> getAndClearTaskTezEvents();
-
public List<String> getDiagnostics();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 368406a..793c12a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -492,18 +492,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
@Override
- public List<TezEvent> getAndClearTaskTezEvents() {
- readLock.lock();
- try {
- List<TezEvent> events = tezEventsForTaskAttempts;
- tezEventsForTaskAttempts = new ArrayList<TezEvent>();
- return events;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
public List<String> getDiagnostics() {
List<String> diagnostics = new ArrayList<String>(attempts.size());
readLock.lock();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a9139ab..d3e07cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -462,7 +462,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Set<String> inputsWithInitializers;
private int numInitializedInputs;
private boolean startSignalPending = false;
- List<TezEvent> pendingRouteEvents = null;
+ private boolean tasksNotYetScheduled = true;
+ // We may always store task events in the vertex for scalability
+ List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
+ List<TezEvent> pendingRouteEventsWhileIniting = null;
List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
private RootInputInitializerRunner rootInputInitializer;
@@ -803,6 +806,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public void scheduleTasks(List<Integer> taskIDs) {
readLock.lock();
try {
+ tasksNotYetScheduled = false;
+ if (!pendingTaskEvents.isEmpty()) {
+ VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
+ new VertexEventRouteEvent(getVertexId(), pendingTaskEvents));
+ pendingTaskEvents.clear();
+ }
for (Integer taskID : taskIDs) {
if (tasks.size() <= taskID.intValue()) {
throw new TezUncheckedException(
@@ -874,10 +883,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
edge.startEventBuffering();
}
- // Use a set since the same event may have been sent to multiple tasks
- // and we want to avoid duplicates
- Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
-
LOG.info("Vertex " + getVertexId() +
" parallelism set to " + parallelism + " from " + numTasks);
// assign to local variable of LinkedHashMap to make sure that changing
@@ -896,7 +901,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ " for vertex: " + getVertexId() + " name: " + getName());
return false;
}
- pendingEvents.addAll(task.getAndClearTaskTezEvents());
if (i <= parallelism) {
continue;
}
@@ -913,7 +917,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ entry.getKey() + " destination: " + getVertexId());
Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
Edge edge = sourceVertices.get(sourceVertex);
- EdgeProperty edgeProperty = edge.getEdgeProperty();
try {
edge.setCustomEdgeManager(entry.getValue());
} catch (Exception e) {
@@ -926,21 +929,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
- // Re-route all existing TezEvents according to new routing table
- // At this point only events attributed to source task attempts can be
- // re-routed. e.g. DataMovement or InputFailed events.
- // This assumption is fine for now since these tasks haven't been started.
- // So they can only get events generated from source task attempts that
- // have already been started.
- DAG dag = getDAG();
- for(TezEvent event : pendingEvents) {
- TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
- .getTaskID().getVertexID();
- Vertex sourceVertex = dag.getVertex(sourceVertexId);
- Edge sourceEdge = sourceVertices.get(sourceVertex);
- sourceEdge.sendTezEventToDestinationTasks(event);
- }
-
// stop buffering events
for (Edge edge : sourceVertices.values()) {
edge.stopEventBuffering();
@@ -1550,10 +1538,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
// Vertex will be moving to INITED state, safe to process pending route events.
- if (pendingRouteEvents != null) {
+ if (pendingRouteEventsWhileIniting != null) {
VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
- new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
- pendingRouteEvents = null;
+ new VertexEventRouteEvent(getVertexId(), pendingRouteEventsWhileIniting));
+ pendingRouteEventsWhileIniting = null;
}
return vertexState;
}
@@ -2032,12 +2020,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
VertexEventRouteEvent re = (VertexEventRouteEvent) event;
- if (vertex.pendingRouteEvents == null) {
- vertex.pendingRouteEvents = Lists.newLinkedList();
+ if (vertex.pendingRouteEventsWhileIniting == null) {
+ vertex.pendingRouteEventsWhileIniting = Lists.newLinkedList();
}
// Store the events for post-init routing, since INIT state is when
// initial task parallelism will be set
- vertex.pendingRouteEvents.addAll(re.getEvents());
+ vertex.pendingRouteEventsWhileIniting.addAll(re.getEvents());
}
}
@@ -2103,14 +2091,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} else {
// event not from this vertex. must have come from source vertex.
// send to tasks
- Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
- sourceMeta.getTaskVertexName()));
- if (srcEdge == null) {
- throw new TezUncheckedException("Bad source vertex: " +
- sourceMeta.getTaskVertexName() + " for destination vertex: " +
- vertex.getVertexId());
+ if (vertex.tasksNotYetScheduled) {
+ vertex.pendingTaskEvents.add(tezEvent);
+ } else {
+ Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+ sourceMeta.getTaskVertexName()));
+ if (srcEdge == null) {
+ throw new TezUncheckedException("Bad source vertex: " +
+ sourceMeta.getTaskVertexName() + " for destination vertex: " +
+ vertex.getVertexId());
+ }
+ srcEdge.sendTezEventToDestinationTasks(tezEvent);
}
- srcEdge.sendTezEventToDestinationTasks(tezEvent);
}
}
break;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a9bace05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 0bf2b8c..1a14b15 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -98,6 +98,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
@@ -114,12 +115,16 @@ import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.test.EdgeManagerForTest;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -1419,6 +1424,43 @@ public class TestVertexImpl {
Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
}
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexPendingTaskEvents() {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v3 = vertices.get("vertex3");
+ VertexImpl v2 = vertices.get("vertex2");
+
+ startVertex(v2);
+ startVertex(v3);
+
+ TezTaskID t0_v2 = TezTaskID.getInstance(v2.getVertexId(), 0);
+ TezTaskAttemptID ta0_t0_v2 = TezTaskAttemptID.getInstance(t0_v2, 0);
+
+ List<TezEvent> taskEvents = Lists.newLinkedList();
+ TezEvent tezEvent1 = new TezEvent(
+ new CompositeDataMovementEvent(0, 1, new byte[0]),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
+ TezEvent tezEvent2 = new TezEvent(
+ new DataMovementEvent(0, new byte[0]),
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
+ taskEvents.add(tezEvent1);
+ taskEvents.add(tezEvent2);
+ // send events and test that they are buffered until some task is scheduled
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v3.getVertexId(), taskEvents));
+ dispatcher.await();
+ Assert.assertEquals(2, v3.pendingTaskEvents.size());
+ v3.scheduleTasks(Collections.singletonList(new Integer(0)));
+ dispatcher.await();
+ Assert.assertEquals(0, v3.pendingTaskEvents.size());
+ // send events and test that they are not buffered anymore
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v3.getVertexId(), taskEvents));
+ dispatcher.await();
+ Assert.assertEquals(0, v3.pendingTaskEvents.size());
+ }
@Test(timeout = 5000)
public void testSetCustomEdgeManager() throws UnsupportedEncodingException {