You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/12 06:03:38 UTC
git commit: TEZ-608. Fix 1-1 edge connection when parallelism is
determined at runtime (bikas)
Updated Branches:
refs/heads/master 18540ca92 -> 8aac5ba45
TEZ-608. Fix 1-1 edge connection when parallelism is determined at runtime (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8aac5ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8aac5ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8aac5ba4
Branch: refs/heads/master
Commit: 8aac5ba45c6f1a786659410bbf217328f777f719
Parents: 18540ca
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Nov 11 21:00:20 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Nov 11 21:00:20 2013 -0800
----------------------------------------------------------------------
.../event/VertexEventOneToOneSourceSplit.java | 50 +++
.../tez/dag/app/dag/event/VertexEventType.java | 1 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 28 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 366 ++++++++++++-------
.../tez/dag/app/dag/impl/TestVertexImpl.java | 255 ++++++++++---
.../tez/mapreduce/examples/ExampleDriver.java | 5 +-
.../examples/FilterLinesByWordOneToOne.java | 275 ++++++++++++++
.../mapreduce/examples/OrderedWordCount.java | 1 -
.../processor/FilterByWordInputProcessor.java | 4 +-
9 files changed, 795 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
new file mode 100644
index 0000000..a7e580e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventOneToOneSourceSplit extends VertexEvent {
+ final int numTasks;
+ final TezVertexID originalSplitVertex;
+ final TezVertexID senderVertex;
+
+ public VertexEventOneToOneSourceSplit(TezVertexID vertexId,
+ TezVertexID senderVertex,
+ TezVertexID originalSplitVertex,
+ int numTasks) {
+ super(vertexId, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT);
+ this.numTasks = numTasks;
+ this.senderVertex = senderVertex;
+ this.originalSplitVertex = originalSplitVertex;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public TezVertexID getOriginalSplitSource() {
+ return originalSplitVertex;
+ }
+
+ public TezVertexID getSenderVertex() {
+ return senderVertex;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 9d62ede..fccfe91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -54,6 +54,7 @@ public enum VertexEventType {
V_COUNTER_UPDATE,
V_ROUTE_EVENT,
+ V_ONE_TO_ONE_SOURCE_SPLIT,
//Producer: VertexInputInitializer
V_ROOT_INPUT_INITIALIZED,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index d16086b..ee6b832 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -557,23 +557,24 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return vertex.getVertexStatus(statusOptions);
}
-
- protected void startRootVertices() {
+ protected void initializeVerticesAndStart() {
for (Vertex v : vertices.values()) {
if (v.getInputVerticesCount() == 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Starting root vertex " + v.getName());
+ LOG.debug("Initing root vertex " + v.getName());
}
eventHandler.handle(new VertexEvent(v.getVertexId(),
- VertexEventType.V_START));
+ VertexEventType.V_INIT));
}
}
- }
-
- protected void initializeVertices() {
for (Vertex v : vertices.values()) {
- eventHandler.handle(new VertexEvent(v.getVertexId(),
- VertexEventType.V_INIT));
+ if (v.getInputVerticesCount() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting root vertex " + v.getName());
+ }
+ eventHandler.handle(new VertexEvent(v.getVertexId(),
+ VertexEventType.V_START));
+ }
}
}
@@ -986,15 +987,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
* triggered in MRAppMaster's startJobs() method.
*/
@Override
- public void transition(DAGImpl job, DAGEvent event) {
- job.startTime = job.clock.getTime();
- job.initializeVertices();
- job.logJobHistoryInitedEvent();
+ public void transition(DAGImpl dag, DAGEvent event) {
+ dag.startTime = dag.clock.getTime();
+ dag.logJobHistoryInitedEvent();
// TODO Metrics
//job.metrics.runningJob(job);
// Start all vertices with no incoming edges when job starts
- job.startRootVertices();
+ dag.initializeVerticesAndStart();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e0a4ecf..cce2043 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -108,6 +108,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
@@ -173,10 +174,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
//fields initialized in init
private int numStartedSourceVertices = 0;
+ private int numInitedSourceVertices = 0;
private int distanceFromRoot = 0;
private final List<String> diagnostics = new ArrayList<String>();
-
+
//task/attempt related datastructures
@VisibleForTesting
int numSuccessSourceAttemptCompletions = 0;
@@ -204,8 +206,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Transitions from NEW state
.addTransition
(VertexState.NEW,
- EnumSet.of(VertexState.INITED, VertexState.INITIALIZING,
- VertexState.FAILED),
+ EnumSet.of(VertexState.NEW, VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.FAILED),
VertexEventType.V_INIT,
new InitTransition())
.addTransition(VertexState.NEW, VertexState.KILLED,
@@ -217,15 +219,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Transitions from INITIALIZING state
.addTransition(VertexState.INITIALIZING,
- EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.RUNNING),
+ EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
+ VertexState.RUNNING, VertexState.FAILED),
VertexEventType.V_ROOT_INPUT_INITIALIZED,
new RootInputInitializedTransition())
+ .addTransition(VertexState.INITIALIZING,
+ EnumSet.of(VertexState.FAILED, VertexState.INITED),
+ VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
+ new OneToOneSourceSplitTransition())
.addTransition(VertexState.INITIALIZING, VertexState.FAILED,
VertexEventType.V_ROOT_INPUT_FAILED,
new RootInputInitFailedTransition())
.addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
VertexEventType.V_START,
- new StartWhileInitingTransition())
+ new StartWhileInitializingTransition())
.addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
VertexEventType.V_SOURCE_VERTEX_STARTED,
new SourceVertexStartedTransition())
@@ -243,10 +250,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
INTERNAL_ERROR_TRANSITION)
// Transitions from INITED state
- // SOURCE_VERTEX_STARTED - for srces which detemrine parallelism, they must complete before this vertex can start.
+ // SOURCE_VERTEX_STARTED - for sources which determine parallelism,
+ // they must complete before this vertex can start.
.addTransition(VertexState.INITED, VertexState.INITED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
new SourceVertexStartedTransition())
+ .addTransition(VertexState.INITED,
+ EnumSet.of(VertexState.INITED),
+ VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
+ new OneToOneSourceSplitTransition())
.addTransition(VertexState.INITED, VertexState.INITED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
@@ -346,9 +358,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.FAILED, VertexState.FAILED,
EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_RESCHEDULED,
+ VertexEventType.V_START,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED,
+ VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_ROOT_INPUT_FAILED))
@@ -361,11 +375,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.KILLED, VertexState.KILLED,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_INIT,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_START,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -382,6 +398,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TERMINATE,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_INTERNAL_ERROR,
@@ -437,6 +454,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private VertexScheduler vertexScheduler;
private boolean parallelismSet = false;
+ private TezVertexID originalOneToOneSplitSource = null;
private VertexOutputCommitter committer;
private AtomicBoolean committed = new AtomicBoolean(false);
@@ -788,89 +806,105 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
"SourceEdge managers cannot be set when determining initial parallelism");
this.numTasks = parallelism;
this.createTasks();
- LOG.info("Parallelism set to : " + this.numTasks);
+ LOG.info("Vertex " + getVertexId() +
+ " parallelism set to " + parallelism);
// Pending task event management, which follows, is not required.
// Vertex event buffering is happening elsewhere - while in the Vertex
// INITIALIZING state.
- return;
- }
-
- if (parallelism >= numTasks) {
- // not that hard to support perhaps. but checking right now since there
- // is no use case for it and checking may catch other bugs.
- throw new TezUncheckedException(
- "Increasing parallelism is not supported");
- }
- if (parallelism == numTasks) {
- LOG.info("Ingoring setParallelism to current value: " + parallelism);
- return;
- }
-
- // start buffering incoming events so that we can re-route existing events
- for (Edge edge : sourceVertices.values()) {
- edge.startEventBuffering();
- }
-
- // Use a set since the same event may have been sent to multiple tasks
- // and we want to avoid duplicates
- Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
-
- LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism);
- // assign to local variable of LinkedHashMap to make sure that changing
- // type of task causes compile error. We depend on LinkedHashMap for order
- LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
- Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
- .iterator();
- int i = 0;
- while (iter.hasNext()) {
- i++;
- Map.Entry<TezTaskID, Task> entry = iter.next();
- Task task = entry.getValue();
- if (task.getState() != TaskState.NEW) {
+ } else {
+ if (parallelism >= numTasks) {
+ // not that hard to support perhaps. but checking right now since there
+ // is no use case for it and checking may catch other bugs.
throw new TezUncheckedException(
- "All tasks must be in initial state when changing parallelism"
- + " for vertex: " + getVertexId() + " name: " + getName());
+ "Increasing parallelism is not supported");
}
- pendingEvents.addAll(task.getAndClearTaskTezEvents());
- if (i <= parallelism) {
- continue;
+ if (parallelism == numTasks) {
+ LOG.info("Ingoring setParallelism to current value: " + parallelism);
+ return;
}
- LOG.info("Removing task: " + entry.getKey());
- iter.remove();
- }
- this.numTasks = parallelism;
- assert tasks.size() == numTasks;
-
- // set new edge managers
- if(sourceEdgeManagers != null) {
- for(Map.Entry<Vertex, EdgeManager> entry : sourceEdgeManagers.entrySet()) {
- Vertex sourceVertex = entry.getKey();
- EdgeManager edgeManager = entry.getValue();
- Edge edge = sourceVertices.get(sourceVertex);
- LOG.info("Replacing edge manager for source:"
- + sourceVertex.getVertexId() + " destination: " + getVertexId());
- edge.setEdgeManager(edgeManager);
+
+ // start buffering incoming events so that we can re-route existing events
+ for (Edge edge : sourceVertices.values()) {
+ edge.startEventBuffering();
+ }
+
+ // Use a set since the same event may have been sent to multiple tasks
+ // and we want to avoid duplicates
+ Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
+
+ LOG.info("Vertex " + getVertexId() +
+ " parallelism set to " + parallelism + " from " + numTasks);
+ // assign to local variable of LinkedHashMap to make sure that changing
+ // type of task causes compile error. We depend on LinkedHashMap for order
+ LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+ Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+ .iterator();
+ int i = 0;
+ while (iter.hasNext()) {
+ i++;
+ Map.Entry<TezTaskID, Task> entry = iter.next();
+ Task task = entry.getValue();
+ if (task.getState() != TaskState.NEW) {
+ throw new TezUncheckedException(
+ "All tasks must be in initial state when changing parallelism"
+ + " for vertex: " + getVertexId() + " name: " + getName());
+ }
+ pendingEvents.addAll(task.getAndClearTaskTezEvents());
+ if (i <= parallelism) {
+ continue;
+ }
+ LOG.info("Removing task: " + entry.getKey());
+ iter.remove();
+ }
+ this.numTasks = parallelism;
+ assert tasks.size() == numTasks;
+
+ // set new edge managers
+ if(sourceEdgeManagers != null) {
+ for(Map.Entry<Vertex, EdgeManager> entry : sourceEdgeManagers.entrySet()) {
+ Vertex sourceVertex = entry.getKey();
+ EdgeManager edgeManager = entry.getValue();
+ Edge edge = sourceVertices.get(sourceVertex);
+ LOG.info("Replacing edge manager for source:"
+ + sourceVertex.getVertexId() + " destination: " + getVertexId());
+ edge.setEdgeManager(edgeManager);
+ }
+ }
+
+ // Re-route all existing TezEvents according to new routing table
+ // At this point only events attributed to source task attempts can be
+ // re-routed. e.g. DataMovement or InputFailed events.
+ // This assumption is fine for now since these tasks haven't been started.
+ // So they can only get events generated from source task attempts that
+ // have already been started.
+ DAG dag = getDAG();
+ for(TezEvent event : pendingEvents) {
+ TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
+ .getTaskID().getVertexID();
+ Vertex sourceVertex = dag.getVertex(sourceVertexId);
+ Edge sourceEdge = sourceVertices.get(sourceVertex);
+ sourceEdge.sendTezEventToDestinationTasks(event);
+ }
+
+ // stop buffering events
+ for (Edge edge : sourceVertices.values()) {
+ edge.stopEventBuffering();
}
}
-
- // Re-route all existing TezEvents according to new routing table
- // At this point only events attributed to source task attempts can be
- // re-routed. e.g. DataMovement or InputFailed events.
- // This assumption is fine for now since these tasks haven't been started.
- // So they can only get events generated from source task attempts that
- // have already been started.
- DAG dag = getDAG();
- for(TezEvent event : pendingEvents) {
- TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
- .getTaskID().getVertexID();
- Vertex sourceVertex = dag.getVertex(sourceVertexId);
- Edge sourceEdge = sourceVertices.get(sourceVertex);
- sourceEdge.sendTezEventToDestinationTasks(event);
- }
-
- // stop buffering events
- for (Edge edge : sourceVertices.values()) {
- edge.stopEventBuffering();
+
+ for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
+ Edge edge = entry.getValue();
+ if (edge.getEdgeProperty().getDataMovementType()
+ == DataMovementType.ONE_TO_ONE) {
+ // inform these target vertices that we have changed parallelism
+ VertexEventOneToOneSourceSplit event =
+ new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
+ getVertexId(),
+ ((originalOneToOneSplitSource!=null) ?
+ originalOneToOneSplitSource : getVertexId()),
+ numTasks);
+ getEventHandler().handle(event);
+ }
}
} finally {
@@ -1192,7 +1226,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ VertexState vertexState = VertexState.NEW;
+ vertex.numInitedSourceVertices++;
+ if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
+ vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+ vertexState = handleInitEvent(vertex, event);
+ if (vertexState != VertexState.FAILED) {
+ if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
+ for (Vertex target : vertex.targetVertices.keySet()) {
+ vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
+ VertexEventType.V_INIT));
+ }
+ }
+ }
+ }
+ return vertexState;
+ }
+ private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
vertex.initTimeRequested = vertex.clock.getTime();
// VertexManager needs to be setup before attempting to Initialize any
@@ -1269,40 +1320,59 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.numTasks == -1) {
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers"
+ " to set #tasks for the vertex " + vertex.getVertexId());
- } else {
- vertex.createTasks();
- }
- if (vertex.inputsWithInitializers != null) {
- // Use DAGScheduler to arbitrate resources among vertices later
- // Ask for 1.5 the number of tasks we can fit in one wave
- int totalResource = vertex.appContext.getTaskScheduler()
- .getTotalResources().getMemory();
- int taskResource = vertex.getTaskResource().getMemory();
- float waves = vertex.conf.getFloat(
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
-
- int numTasks = (int)((totalResource*waves)/taskResource);
-
- LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
- + " tasks. Headroom: " + totalResource + " Task Resource: "
- + taskResource + " waves: " + waves);
-
- vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
- vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
- vertex.eventHandler, numTasks);
- List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
- .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
- for (String inputName : vertex.inputsWithInitializers) {
- inputList.add(vertex.additionalInputs.get(inputName));
+ if (vertex.inputsWithInitializers != null) {
+ // Use DAGScheduler to arbitrate resources among vertices later
+ // Ask for 1.5 the number of tasks we can fit in one wave
+ int totalResource = vertex.appContext.getTaskScheduler()
+ .getTotalResources().getMemory();
+ int taskResource = vertex.getTaskResource().getMemory();
+ float waves = vertex.conf.getFloat(
+ TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
+ TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+
+ int numTasks = (int)((totalResource*waves)/taskResource);
+
+ LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
+ + " tasks. Headroom: " + totalResource + " Task Resource: "
+ + taskResource + " waves: " + waves);
+
+ vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
+ vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
+ vertex.eventHandler, numTasks);
+ List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
+ .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
+ for (String inputName : vertex.inputsWithInitializers) {
+ inputList.add(vertex.additionalInputs.get(inputName));
+ }
+ LOG.info("Starting root input initializers: "
+ + vertex.inputsWithInitializers.size());
+ vertex.rootInputInitializer.runInputInitializers(inputList);
+ } else {
+ // no input initializers. At this moment, only other case is 1-1 edge
+ // with uninitialized sources
+ boolean hasOneToOneUninitedSource = false;
+ for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
+ if (entry.getValue().getEdgeProperty().getDataMovementType() ==
+ DataMovementType.ONE_TO_ONE) {
+ if (entry.getKey().getTotalTasks() == -1) {
+ hasOneToOneUninitedSource = true;
+ break;
+ }
+ }
+ }
+ if (!hasOneToOneUninitedSource) {
+ throw new TezUncheckedException(vertex.getVertexId() +
+ " has -1 tasks but neither input initializers nor 1-1 uninited sources");
+ }
}
- LOG.info("Starting root input initializers: "
- + vertex.inputsWithInitializers.size());
- vertex.rootInputInitializer.runInputInitializers(inputList);
+
return VertexState.INITIALIZING;
+ } else {
+ vertex.createTasks();
}
+
return vertex.initializeVertex();
}
} // end of InitTransition
@@ -1314,6 +1384,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return new RootInputInitializerRunner(dagName, vertexName, vertexID,
eventHandler, numTasks);
}
+
+ private VertexState initializeVertexInInitializingState() {
+ VertexState vertexState = initializeVertex();
+ if (vertexState == VertexState.FAILED) {
+ // Don't bother starting if the vertex state is failed.
+ return vertexState;
+ }
+
+ // Vertex will be moving to INITED state, safe to process pending route events.
+ if (pendingRouteEvents != null) {
+ VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
+ new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
+ pendingRouteEvents = null;
+ }
+ return vertexState;
+ }
public static class RootInputInitializedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -1334,18 +1420,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// If RootInputs are determining parallelism, it should have been set by
// this point, so it's safe to checkTaskLimits and createTasks
- VertexState vertexState = vertex.initializeVertex();
+ VertexState vertexState = vertex.initializeVertexInInitializingState();
if (vertexState == VertexState.FAILED) {
- // Don't bother starting if the vertex state is failed.
- return vertexState;
- }
-
- // Vertex will be moving to INITED state, safe to process pending route events.
- if (vertex.pendingRouteEvents != null) {
- VertexImpl.ROUTE_EVENT_TRANSITION.transition(vertex,
- new VertexEventRouteEvent(vertex.getVertexId(),
- vertex.pendingRouteEvents));
- vertex.pendingRouteEvents = null;
+ return VertexState.FAILED;
}
if (vertex.startSignalPending) {
@@ -1360,6 +1437,50 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ public static class OneToOneSourceSplitTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ VertexEventOneToOneSourceSplit splitEvent =
+ (VertexEventOneToOneSourceSplit)event;
+ TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
+ if (vertex.originalOneToOneSplitSource != null) {
+ Preconditions.checkState(vertex.getState() == VertexState.INITED,
+ " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+ " in state " + vertex.getState() +
+ " . Split in vertex " + originalSplitSource +
+ " sent by vertex " + splitEvent.getSenderVertex() +
+ " numTasks " + splitEvent.getNumTasks());
+ if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
+ // ignore another split event that may have come from a different
+ // path in the DAG. We have already split because of that source
+ LOG.info("Ignoring split of vertex " + vertex.getVertexId() +
+ " because of split in vertex " + originalSplitSource +
+ " sent by vertex " + splitEvent.getSenderVertex() +
+ " numTasks " + splitEvent.getNumTasks());
+ return VertexState.INITED;
+ }
+ // cannot split from multiple sources
+ throw new TezUncheckedException("Vertex: " + vertex.getVertexId() +
+ " asked to split by: " + originalSplitSource +
+ " but was already split by:" + vertex.originalOneToOneSplitSource);
+ }
+ Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
+ " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+ " in state " + vertex.getState() +
+ " . Split in vertex " + originalSplitSource +
+ " sent by vertex " + splitEvent.getSenderVertex() +
+ " numTasks " + splitEvent.getNumTasks());
+ LOG.info("Splitting vertex " + vertex.getVertexId() +
+ " because of split in vertex " + originalSplitSource +
+ " sent by vertex " + splitEvent.getSenderVertex() +
+ " numTasks " + splitEvent.getNumTasks());
+ vertex.originalOneToOneSplitSource = originalSplitSource;
+ vertex.setParallelism(splitEvent.getNumTasks(), null);
+ return vertex.initializeVertexInInitializingState();
+ }
+ }
// Temporary to maintain topological order while starting vertices. Not useful
// since there's not much difference between the INIT and RUNNING states.
@@ -1386,7 +1507,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
- public static class StartWhileInitingTransition implements SingleArcTransition<VertexImpl, VertexEvent> {
+ public static class StartWhileInitializingTransition implements
+ SingleArcTransition<VertexImpl, VertexEvent> {
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 4a479e3..abb0d53 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -319,6 +319,140 @@ public class TestVertexImpl {
.build();
return dag;
}
+
+ private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName) {
+ LOG.info("Setting up one to one dag plan");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("testVertexOneToOneSplit")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setInitializerClassName(initializerClassName)
+ .setName("input1")
+ .setEntityDescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ ).build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .addOutEdgeId("e2")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addOutEdgeId("e3")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x3.y3")
+ .build()
+ )
+ .addInEdgeId("e2")
+ .addOutEdgeId("e4")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex4")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x4.y4")
+ .build()
+ )
+ .addInEdgeId("e3")
+ .addInEdgeId("e4")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex2")
+ .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+ .setId("e2")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v4"))
+ .setInputVertexName("vertex2")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex4")
+ .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+ .setId("e3")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v3_v4"))
+ .setInputVertexName("vertex3")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex4")
+ .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+ .setId("e4")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ return dag;
+ }
private DAGPlan createTestDAGPlan() {
LOG.info("Setting up dag plan");
@@ -680,10 +814,16 @@ public class TestVertexImpl {
this.vertexIdMap = null;
}
- private void initAllVertices() {
- for (int i = 1; i <= 6; ++i) {
+ private void initAllVertices(VertexState expectedState) {
+ for (int i = 1; i <= vertices.size(); ++i) {
+ VertexImpl v = vertices.get("vertex" + i);
+ if (v.sourceVertices == null || v.sourceVertices.isEmpty()) {
+ initVertex(v);
+ }
+ }
+ for (int i = 1; i <= vertices.size(); ++i) {
VertexImpl v = vertices.get("vertex" + i);
- initVertex(v);
+ Assert.assertEquals(expectedState, v.getState());
}
}
@@ -694,7 +834,6 @@ public class TestVertexImpl {
dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(),
VertexEventType.V_INIT));
dispatcher.await();
- Assert.assertEquals(VertexState.INITED, v.getState());
}
private void startVertex(VertexImpl v) {
@@ -724,11 +863,9 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexInit() {
- VertexImpl v = vertices.get("vertex2");
- initVertex(v);
+ initAllVertices(VertexState.INITED);
VertexImpl v3 = vertices.get("vertex3");
- initVertex(v3);
Assert.assertEquals("x3.y3", v3.getProcessorName());
Assert.assertEquals("foo", v3.getJavaOpts());
@@ -775,7 +912,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexStart() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -783,8 +920,8 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexSetParallelism() {
+ initAllVertices(VertexState.INITED);
VertexImpl v3 = vertices.get("vertex3");
- initVertex(v3);
Assert.assertEquals(2, v3.getTotalTasks());
Map<TezTaskID, Task> tasks = v3.getTasks();
Assert.assertEquals(2, tasks.size());
@@ -808,7 +945,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testBasicVertexCompletion() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -833,7 +970,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
@Ignore // FIXME fix verteximpl for this test to work
public void testDuplicateTaskCompletion() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -861,7 +998,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexFailure() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -881,30 +1018,16 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexKillDiagnostics() {
- VertexImpl v1 = vertices.get("vertex1");
- killVertex(v1);
- String diagnostics =
- StringUtils.join(",", v1.getDiagnostics()).toLowerCase();
- Assert.assertTrue(diagnostics.contains(
- "vertex received kill in new state"));
-
+ initAllVertices(VertexState.INITED);
VertexImpl v2 = vertices.get("vertex2");
- initVertex(v2);
killVertex(v2);
- diagnostics =
+ String diagnostics =
StringUtils.join(",", v2.getDiagnostics()).toLowerCase();
LOG.info("diagnostics v2: " + diagnostics);
Assert.assertTrue(diagnostics.contains(
"vertex received kill in inited state"));
VertexImpl v3 = vertices.get("vertex3");
- VertexImpl v4 = vertices.get("vertex4");
- VertexImpl v5 = vertices.get("vertex5");
- VertexImpl v6 = vertices.get("vertex6");
- initVertex(v3);
- initVertex(v4);
- initVertex(v5);
- initVertex(v6);
startVertex(v3);
killVertex(v3);
@@ -917,7 +1040,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexKillPending() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -943,7 +1066,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test
public void testVertexKill() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -969,7 +1092,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testKilledTasksHandling() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -987,26 +1110,24 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexCommitterInit() {
+ initAllVertices(VertexState.INITED);
VertexImpl v2 = vertices.get("vertex2");
- initVertex(v2);
Assert.assertTrue(v2.getVertexOutputCommitter()
instanceof NullVertexOutputCommitter);
VertexImpl v6 = vertices.get("vertex6");
- initVertex(v6);
Assert.assertTrue(v6.getVertexOutputCommitter()
instanceof MRVertexOutputCommitter);
}
@Test(timeout = 5000)
public void testVertexSchedulerInit() {
+ initAllVertices(VertexState.INITED);
VertexImpl v2 = vertices.get("vertex2");
- initVertex(v2);
Assert.assertTrue(v2.getVertexScheduler()
instanceof ImmediateStartVertexScheduler);
VertexImpl v6 = vertices.get("vertex6");
- initVertex(v6);
Assert.assertTrue(v6.getVertexScheduler()
instanceof ShuffleVertexManager);
}
@@ -1014,7 +1135,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexTaskFailure() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
@@ -1045,7 +1166,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testSourceVertexStartHandling() {
LOG.info("Testing testSourceVertexStartHandling");
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v4 = vertices.get("vertex4");
VertexImpl v5 = vertices.get("vertex5");
@@ -1077,7 +1198,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testSourceTaskAttemptCompletionEvents() {
LOG.info("Testing testSourceTaskAttemptCompletionEvents");
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v4 = vertices.get("vertex4");
VertexImpl v5 = vertices.get("vertex5");
@@ -1125,7 +1246,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGEventGeneration() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
startVertex(v);
@@ -1148,7 +1269,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testTaskReschedule() {
// For downstream failures
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
@@ -1185,7 +1306,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexSuccessToRunningAfterTaskScheduler() {
// For downstream failures
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
@@ -1228,7 +1349,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
public void testVertexSuccessToFailedAfterTaskScheduler() {
// For downstream failures
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
@@ -1262,7 +1383,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexCommit() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
@@ -1303,7 +1424,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testBadCommitter() {
- initAllVertices();
+ initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex2");
@@ -1330,6 +1451,46 @@ public class TestVertexImpl {
Assert.assertEquals(0, committer.initCounter); // already done in init
Assert.assertEquals(0, committer.setupCounter); // already done in init
}
+
+ @Test//(timeout = 5000)
+ public void testVertexWithOneToOneSplit() {
+ // create a diamond shaped dag with 1-1 edges.
+ // split the source and remaining vertices should split equally
+ // vertex with 2 incoming splits from the same source should split once
+ useCustomInitializer = true;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer");
+ setupPostDagCreation();
+ initAllVertices(VertexState.INITIALIZING);
+
+ int numTasks = 5;
+ VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+ .get("vertex1");
+ Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+ RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+ List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
+ runner1.completeInputInitialization(numTasks, v1Hints);
+
+ Assert.assertEquals(VertexState.INITED, v1.getState());
+ Assert.assertEquals(numTasks, v1.getTotalTasks());
+ Assert.assertEquals(RootInputVertexManager.class.getName(), v1
+ .getVertexScheduler().getClass().getName());
+ Assert.assertEquals(v1Hints, v1.getVertexLocationHint().getTaskLocationHints());
+ Assert.assertEquals(true, runner1.hasShutDown);
+
+ Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
+ Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
+ Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
+ Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
+ Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
+ Assert.assertEquals(VertexState.INITED, vertices.get("vertex4").getState());
+
+ startVertex(v1);
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+ Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+ }
@Test(timeout = 5000)
public void testHistoryEventGeneration() {
@@ -1371,9 +1532,6 @@ public class TestVertexImpl {
Assert.assertEquals(true, runner1.hasShutDown);
VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
- dispatcher.getEventHandler().handle(
- new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT));
- dispatcher.await();
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
runner2.failInputInitialization();
@@ -1410,9 +1568,6 @@ public class TestVertexImpl {
Assert.assertEquals(true, runner1.hasShutDown);
VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
- dispatcher.getEventHandler().handle(
- new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT));
- dispatcher.await();
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 23f5c72..ff2a804 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -20,7 +20,6 @@ package org.apache.tez.mapreduce.examples;
import java.io.IOException;
import java.text.DecimalFormat;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
@@ -84,7 +83,9 @@ public class ExampleDriver {
pgd.addClass("orderedwordcount", OrderedWordCount.class,
"Word Count with words sorted on frequency");
pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
- "Filters lines by the specified word");
+ "Filters lines by the specified word using broadcast edge");
+ pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class,
+ "Filters lines by the specified word using OneToOne edge");
exitCode = pgd.run(argv);
}
catch(Throwable e){
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
new file mode 100644
index 0000000..9c05599
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
+import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.processor.FilterByWordInputProcessor;
+import org.apache.tez.processor.FilterByWordOutputProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+public class FilterLinesByWordOneToOne {
+
+ private static Log LOG = LogFactory.getLog(FilterLinesByWordOneToOne.class);
+
+ public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
+
+ private static void printUsage() {
+ System.err.println("Usage filterLinesByWordOneToOne <in> <out> <filter_word>"
+ + " [-generateSplitsInClient true/<false>]");
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
+ Configuration conf = new Configuration();
+ String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+ boolean generateSplitsInClient = false;
+ SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
+ try {
+ generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
+ otherArgs = splitCmdLineParser.getRemainingArgs();
+ } catch (ParseException e1) {
+ System.err.println("Invalid options");
+ printUsage();
+ System.exit(2);
+ }
+
+ if (otherArgs.length != 3) {
+ printUsage();
+ System.exit(2);
+ }
+
+ String inputPath = otherArgs[0];
+ String outputPath = otherArgs[1];
+ String filterWord = otherArgs[2];
+
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(new Path(outputPath))) {
+ System.err.println("Output directory : " + outputPath + " already exists");
+ System.exit(2);
+ }
+
+ TezConfiguration tezConf = new TezConfiguration(conf);
+
+ fs.getWorkingDirectory();
+ Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+ TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+ tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, MRHelpers.getMRAMJavaOpts(tezConf));
+
+ String jarPath = ClassUtil.findContainingJar(FilterLinesByWordOneToOne.class);
+ if (jarPath == null) {
+ throw new TezUncheckedException("Could not find any jar containing"
+ + FilterLinesByWordOneToOne.class.getName() + " in the classpath");
+ }
+
+ Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
+ fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+ FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+
+ Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
+ LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
+ commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+
+
+ AMConfiguration amConf = new AMConfiguration(null, commonLocalResources, tezConf, null);
+ TezSessionConfiguration sessionConf = new TezSessionConfiguration(amConf, tezConf);
+ TezSession tezSession = new TezSession("FilterLinesByWordSession", sessionConf);
+ tezSession.start(); // Why do I need to start the TezSession.
+
+ Configuration stage1Conf = new JobConf(conf);
+ stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
+ stage1Conf.setBoolean("mapred.mapper.new-api", false);
+ stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
+ stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, TextLongPair.class.getName());
+ stage1Conf.set(FILTER_PARAM_NAME, filterWord);
+
+ InputSplitInfo inputSplitInfo = null;
+ if (generateSplitsInClient) {
+ inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+ }
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+
+
+
+ Configuration stage2Conf = new JobConf(conf);
+ stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, Text.class.getName());
+ stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, TextLongPair.class.getName());
+ stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
+ stage2Conf.setBoolean("mapred.mapper.new-api", false);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf, stage1Conf);
+
+ MRHelpers.doJobClientMagic(stage1Conf);
+ MRHelpers.doJobClientMagic(stage2Conf);
+
+ byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ // Setup stage1 Vertex
+ int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
+ Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
+ FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
+ stage1NumTasks, MRHelpers.getMapResource(stage1Conf));
+ stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
+ if (generateSplitsInClient) {
+ stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+ stage1LocalResources.putAll(commonLocalResources);
+ MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
+ stage1Vertex.setTaskLocalResources(stage1LocalResources);
+ } else {
+ stage1Vertex.setTaskLocalResources(commonLocalResources);
+ }
+ Map<String, String> stage1Env = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true);
+ stage1Vertex.setTaskEnvironment(stage1Env);
+
+ // Configure the Input for stage1
+ Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
+ : MRInputAMSplitGenerator.class;
+ stage1Vertex.addInput("MRInput",
+ new InputDescriptor(MRInputLegacy.class.getName())
+ .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
+ initializerClazz);
+
+ // Setup stage2 Vertex
+ Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
+ FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
+ .createUserPayloadFromConf(stage2Conf)), stage1NumTasks,
+ MRHelpers.getMapResource(stage2Conf));
+ stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf)).setTaskLocalResources(commonLocalResources);
+ Map<String, String> stage2Env = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(stage2Conf, stage2Env, false);
+ stage2Vertex.setTaskEnvironment(stage2Env);
+
+ // Configure the Output for stage2
+ stage2Vertex.addOutput("MROutput",
+ new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
+ .createUserPayloadFromConf(stage2Conf)));
+
+ DAG dag = new DAG("FilterLinesByWord");
+ Edge edge = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+ DataMovementType.ONE_TO_ONE, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, new OutputDescriptor(
+ OnFileUnorderedKVOutput.class.getName()), new InputDescriptor(
+ ShuffledUnorderedKVInput.class.getName())));
+ dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
+
+ LOG.info("Submitting DAG to Tez Session");
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ LOG.info("Submitted DAG to Tez Session");
+
+ DAGStatus dagStatus = null;
+ String[] vNames = { "stage1", "stage2" };
+ try {
+ while (true) {
+ dagStatus = dagClient.getDAGStatus(null);
+ if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+ dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.KILLED ||
+ dagStatus.getState() == DAGStatus.State.ERROR) {
+ break;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // continue;
+ }
+ }
+
+ while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+ try {
+ ExampleDriver.printDAGStatus(dagClient, vNames);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // continue;
+ }
+ dagStatus = dagClient.getDAGStatus(null);
+ } catch (TezException e) {
+ LOG.fatal("Failed to get application progress. Exiting");
+ System.exit(-1);
+ }
+ }
+ } finally {
+ fs.delete(stagingDir, true);
+ tezSession.stop();
+ }
+
+ ExampleDriver.printDAGStatus(dagClient, vNames);
+ LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+ System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 65df726..3ffd076 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -20,7 +20,6 @@ package org.apache.tez.mapreduce.examples;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index 2a2395b..423cd1e 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -31,6 +31,7 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord;
import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
@@ -94,7 +95,8 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
- MRInput mrInput = (MRInput) li;
+ MRInputLegacy mrInput = (MRInputLegacy) li;
+ mrInput.init();
OnFileUnorderedKVOutput kvOutput = (OnFileUnorderedKVOutput) lo;
Configuration updatedConf = mrInput.getConfigUpdates();