You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/09/11 21:57:58 UTC
[2/2] git commit: TEZ-407. Support multiple inputs and connection
patterns in Tez (bikas)
TEZ-407. Support multiple inputs and connection patterns in Tez (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d89485f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d89485f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d89485f
Branch: refs/heads/TEZ-398
Commit: 8d89485fd0c2d76894894447bb826b367ee2be7a
Parents: 0f5298a
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Sep 11 12:56:44 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Sep 11 12:56:44 2013 -0700
----------------------------------------------------------------------
.../apache/tez/dag/api/DagTypeConverters.java | 25 +--
.../main/java/org/apache/tez/dag/api/Edge.java | 1 +
.../org/apache/tez/dag/api/TestDAGPlan.java | 7 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 74 +++++--
.../java/org/apache/tez/dag/app/dag/DAG.java | 4 +
.../org/apache/tez/dag/app/dag/EdgeManager.java | 51 +++++
.../java/org/apache/tez/dag/app/dag/Task.java | 7 +
.../java/org/apache/tez/dag/app/dag/Vertex.java | 24 ++-
.../apache/tez/dag/app/dag/VertexScheduler.java | 2 +-
.../dag/app/dag/event/TaskEventAddTezEvent.java | 36 ++++
.../tez/dag/app/dag/event/TaskEventType.java | 3 +
.../app/dag/event/VertexEventRouteEvent.java | 39 ++++
.../tez/dag/app/dag/event/VertexEventType.java | 2 +
.../dag/app/dag/impl/BroadcastEdgeManager.java | 69 ++++++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 45 ++--
.../org/apache/tez/dag/app/dag/impl/Edge.java | 214 +++++++++++++++++++
.../dag/app/dag/impl/OneToOneEdgeManager.java | 66 ++++++
.../app/dag/impl/ScatterGatherEdgeManager.java | 77 +++++++
.../dag/app/dag/impl/ShuffleVertexManager.java | 100 ++++++++-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 5 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 76 +++++++
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 178 ++++++++++++---
.../tez/dag/app/dag/impl/TestVertexImpl.java | 112 +++++-----
.../dag/app/dag/impl/TestVertexScheduler.java | 54 +++--
.../engine/newapi/events/DataMovementEvent.java | 2 +-
.../engine/newapi/events/InputFailedEvent.java | 2 +-
.../tez/engine/newapi/impl/EventMetaData.java | 14 +-
.../newapi/impl/TezHeartbeatResponse.java | 33 ++-
28 files changed, 1122 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index fd07b5b..1fd78f1 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -209,23 +209,14 @@ public class DagTypeConverters {
return edgePlanMap;
}
- public static Map<String, EdgeProperty> createEdgePropertyMapFromDAGPlan(
- List<EdgePlan> edgeList) {
-
- Map<String, EdgeProperty> map = new HashMap<String, EdgeProperty>();
- for(EdgePlan edge: edgeList){
- map.put(edge.getId(),
- new EdgeProperty(
- convertFromDAGPlan(edge.getDataMovementType()),
- convertFromDAGPlan(edge.getDataSourceType()),
- convertFromDAGPlan(edge.getSchedulingType()),
- convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
- convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
- )
- );
- }
-
- return map;
+ public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
+ return new EdgeProperty(
+ convertFromDAGPlan(edge.getDataMovementType()),
+ convertFromDAGPlan(edge.getDataSourceType()),
+ convertFromDAGPlan(edge.getSchedulingType()),
+ convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
+ convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
+ );
}
public static Resource createResourceRequestFromTaskConfig(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
index 71c90fa..a893bc3 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -31,6 +31,7 @@ public class Edge{
this.edgeProperty = edgeProperty;
}
+ // RENAME to source and destination
public Vertex getInputVertex() {
return inputVertex;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index c8d09d0..53ec357 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -25,7 +25,6 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -142,10 +141,8 @@ public class TestDAGPlan {
.getUserPayload().toByteArray()));
assertEquals("output", edgeProto.getEdgeSource().getClassName());
- Map<String, EdgeProperty> edgePropertyMap = DagTypeConverters
- .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList());
- assertEquals(1, edgePropertyMap.size());
- EdgeProperty edgeProperty = edgePropertyMap.values().iterator().next();
+ EdgeProperty edgeProperty = DagTypeConverters
+ .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList().get(0));
byte[] ib = edgeProperty.getEdgeDestination().getUserPayload();
assertEquals("inputBytes", new String(ib));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2c242de..cc99af2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -54,10 +55,12 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerImpl;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
import org.apache.tez.engine.newapi.impl.TezEvent;
@@ -89,9 +92,18 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
private Server server;
- // TODO Use this to figure out whether an incoming ping is valid.
- private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToContainerIdMap =
- new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
+ class AttemptInfo {
+ AttemptInfo(ContainerId containerId) {
+ this.containerId = containerId;
+ this.lastReponse = null;
+ this.lastRequestId = -1;
+ }
+ ContainerId containerId;
+ long lastRequestId;
+ TezHeartbeatResponse lastReponse;
+ }
+ private ConcurrentMap<TezTaskAttemptID, AttemptInfo> attemptToInfoMap =
+ new ConcurrentHashMap<TezTaskAttemptID, AttemptInfo>();
private Set<ContainerId> registeredContainers = Collections
.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
@@ -479,9 +491,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// between polls (MRTask) implies tasks end up wasting upto 1 second doing
// nothing. Similarly for CA_COMMIT.
+ /*
+ DAG job = context.getCurrentDAG();
+ Task task =
+ job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+ getTask(taskAttemptId.getTaskID());
// TODO In-Memory Shuffle
- /*
if (task.needsWaitAfterOutputConsumable()) {
TezTaskAttemptID outputReadyAttempt = task.getOutputConsumableAttempt();
if (outputReadyAttempt != null) {
@@ -518,7 +534,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
@Override
public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
- attemptToContainerIdMap.remove(attemptId);
+ attemptToInfoMap.remove(attemptId);
}
public AMContainerTask pullTaskAttemptContext(ContainerId containerId) {
@@ -539,7 +555,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
@Override
public void registerTaskAttempt(TezTaskAttemptID attemptId,
ContainerId containerId) {
- attemptToContainerIdMap.put(attemptId, containerId);
+ AttemptInfo attemptInfo = new AttemptInfo(containerId);
+ attemptToInfoMap.put(attemptId, attemptInfo);
}
@Override
@@ -556,7 +573,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
- ContainerId containerId = attemptToContainerIdMap.get(taskAttemptId);
+ ContainerId containerId = attemptToInfoMap.get(taskAttemptId).containerId;
if (containerId != null) {
containerHeartbeatHandler.pinged(containerId);
} else {
@@ -568,12 +585,45 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
@Override
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
throws IOException, TezException {
- // TODO TODONEWTEZ Auto-generated method stub
+ long requestId = request.getRequestId();
TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
- LOG.info("Ping from " + taskAttemptID.toString());
- taskHeartbeatHandler.pinged(taskAttemptID);
- pingContainerHeartbeatHandler(taskAttemptID);
- return null;
+ AttemptInfo attemptInfo = attemptToInfoMap.get(taskAttemptID);
+ if(attemptInfo == null) {
+ throw new TezException("Attempt " + taskAttemptID
+ + " is not recognized for heartbeat");
+ }
+ synchronized (attemptInfo) {
+ if(attemptInfo.lastRequestId == requestId) {
+ return attemptInfo.lastReponse;
+ }
+ if(attemptInfo.lastRequestId+1 < requestId) {
+ throw new TezException("Attempt " + taskAttemptID
+ + " has invalid request id. Expected: " + attemptInfo.lastRequestId+1
+ + " and actual: " + requestId);
+ }
+
+ // not safe to multiple call from same task
+ LOG.info("Ping from " + taskAttemptID.toString());
+ List<TezEvent> inEvents = request.getEvents();
+ if(inEvents!=null && inEvents.size()>0) {
+ TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
+ context.getEventHandler().handle(new VertexEventRouteEvent(vertexId, inEvents));
+ }
+ taskHeartbeatHandler.pinged(taskAttemptID);
+ pingContainerHeartbeatHandler(taskAttemptID);
+ TezHeartbeatResponse response = new TezHeartbeatResponse();
+ response.setLastRequestId(requestId);
+ List<TezEvent> outEvents = context
+ .getCurrentDAG()
+ .getVertex(taskAttemptID.getTaskID().getVertexID())
+ .getTask(taskAttemptID.getTaskID())
+ .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
+ request.getMaxEvents());
+ response.setEvents(outEvents);
+ attemptInfo.lastRequestId = requestId;
+ attemptInfo.lastReponse = response;
+ return response;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 455b583..ce1ee89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -49,6 +49,10 @@ public interface DAG {
*/
TezCounters getAllCounters();
+ /**
+ * Get Vertex by vertex name
+ */
+ Vertex getVertex(String vertexName);
Map<TezVertexID,Vertex> getVertices();
Vertex getVertex(TezVertexID vertexId);
List<String> getDiagnostics();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
new file mode 100644
index 0000000..c476966
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public abstract class EdgeManager {
+
+ public abstract int getNumDestinationTaskInputs(Vertex sourceVertex,
+ int destinationTaskIndex);
+
+ public abstract int getNumSourceTaskOutputs(Vertex destinationVertex,
+ int sourceTaskIndex);
+
+ /**
+ * Return the destination task indeces to which to send the event
+ */
+ public abstract void routeEventToDestinationTasks(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+
+ public abstract void routeEventToDestinationTasks(InputFailedEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+
+
+ /**
+ * Return the source task index to which to send the event
+ */
+ public abstract int routeEventToSourceTasks(int destinationTaskIndex,
+ InputReadErrorEvent event);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index f3ca831..0947a41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.dag;
+import java.util.List;
import java.util.Map;
import org.apache.tez.common.counters.TezCounters;
@@ -25,6 +26,7 @@ import org.apache.tez.dag.api.oldrecords.TaskReport;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
/**
* Read only view of Task.
@@ -71,4 +73,9 @@ public interface Task {
TezTaskAttemptID getOutputConsumableAttempt();
public Vertex getVertex();
+
+ public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+ int fromEventId, int maxEvents);
+
+ public List<TezEvent> getAndClearTaskTezEvents();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 48c9993..85240e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -24,11 +24,11 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
+import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -57,6 +57,7 @@ public interface Vertex extends Comparable<Vertex> {
Map<TezTaskID, Task> getTasks();
Task getTask(TezTaskID taskID);
+ Task getTask(int taskIndex);
List<String> getDiagnostics();
int getTotalTasks();
int getCompletedTasks();
@@ -65,19 +66,20 @@ public interface Vertex extends Comparable<Vertex> {
ProgressBuilder getVertexProgress();
VertexStatusBuilder getVertexStatus();
- void setParallelism(int parallelism, List<byte[]> taskUserPayloads);
+ void setParallelism(int parallelism,Map<Vertex, EdgeManager> sourceEdgeManagers);
TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
TezTaskAttemptID attemptId, int fromEventId, int maxEvents);
-
- void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
- void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
-
- Map<Vertex, EdgeProperty> getInputVertices();
- Map<Vertex, EdgeProperty> getOutputVertices();
-
- List<InputSpec> getInputSpecList();
- List<OutputSpec> getOutputSpecList();
+
+ // CHANGE THESE TO LISTS AND MAINTAIN ORDER?
+ void setInputVertices(Map<Vertex, Edge> inVertices);
+ void setOutputVertices(Map<Vertex, Edge> outVertices);
+
+ Map<Vertex, Edge> getInputVertices();
+ Map<Vertex, Edge> getOutputVertices();
+
+ List<InputSpec> getInputSpecList(int taskIndex);
+ List<OutputSpec> getOutputSpecList(int taskIndex);
int getInputVerticesCount();
int getOutputVerticesCount();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
index 7a85eb1..4c79712 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
-// Rename to VertexManager TEZ-364
+// Rename to VertexManager TEZ-364 and move to DAG API. Make abstract class.
public interface VertexScheduler {
void initialize(Configuration conf);
void onVertexStarted();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
new file mode 100644
index 0000000..51f6d53
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+
+public class TaskEventAddTezEvent extends TaskEvent {
+
+ private TezEvent tezEvent;
+
+ public TaskEventAddTezEvent(TezTaskID taskId, TezEvent tezEvent) {
+ super(taskId, TaskEventType.T_ADD_TEZ_EVENT);
+ this.tezEvent = tezEvent;
+ }
+
+ public TezEvent getTezEvent() {
+ return tezEvent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index bc7f3ff..d0ad8a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -31,6 +31,9 @@ public enum TaskEventType {
//Producer:Speculator
T_ADD_SPEC_ATTEMPT,
+
+ //Producer:Edge
+ T_ADD_TEZ_EVENT,
//Producer:TaskAttempt
T_ATTEMPT_LAUNCHED,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
new file mode 100644
index 0000000..c851ae0
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.List;
+
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+
+public class VertexEventRouteEvent extends VertexEvent {
+
+ final List<TezEvent> events;
+
+ public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) {
+ super(vertexId, VertexEventType.V_ROUTE_EVENT);
+ this.events = events;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 43cffe6..dc7e2dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -57,4 +57,6 @@ public enum VertexEventType {
INTERNAL_ERROR,
V_COUNTER_UPDATE,
+ V_ROUTE_EVENT,
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
new file mode 100644
index 0000000..71f17ac
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -0,0 +1,69 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public class BroadcastEdgeManager extends EdgeManager {
+
+ @Override
+ public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ int destinationTaskIndex) {
+ return sourceVertex.getTotalTasks();
+ }
+
+ @Override
+ public int getNumSourceTaskOutputs(Vertex destinationVertex,
+ int sourceTaskIndex) {
+ return 1;
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ event.setTargetIndex(sourceTaskIndex);
+ addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(InputFailedEvent event,
+ int sourceTaskIndex, int numDestinationTasks , List<Integer> taskIndices) {
+ event.setTargetIndex(sourceTaskIndex);
+ addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+ }
+
+ @Override
+ public int routeEventToSourceTasks(int destinationTaskIndex,
+ InputReadErrorEvent event) {
+ return destinationTaskIndex;
+ }
+
+ void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndeces) {
+ for(int i=0; i<numDestinationTasks; ++i) {
+ taskIndeces.add(new Integer(i));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 2ddcdd7..d30d178 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -131,7 +131,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final AppContext appContext;
volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
- private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>();
+ private Map<String, Edge> edges = new HashMap<String, Edge>();
private TezCounters dagCounters = new TezCounters();
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
@@ -836,7 +836,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.addVertex(v);
}
- dag.edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+ createDAGEdges(dag);
Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
// setup the dag
@@ -861,6 +861,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return dag.finished(DAGState.FAILED);
}
}
+
+ private void createDAGEdges(DAGImpl dag) {
+ for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
+ EdgeProperty edgeProperty = DagTypeConverters
+ .createEdgePropertyMapFromDAGPlan(edgePlan);
+ // edge manager may be also set via API when using custom edge type
+ dag.edges.put(edgePlan.getId(),
+ new Edge(edgeProperty, dag.getEventHandler()));
+ }
+ }
private void assignDAGScheduler(DAGImpl dag) {
if (dag.conf.getBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING,
@@ -870,17 +880,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
} else {
boolean isMRR = true;
for (Vertex vertex : dag.vertices.values()) {
- Map<Vertex, EdgeProperty> outVertices = vertex.getOutputVertices();
- Map<Vertex, EdgeProperty> inVertices = vertex.getInputVertices();
+ Map<Vertex, Edge> outVertices = vertex.getOutputVertices();
+ Map<Vertex, Edge> inVertices = vertex.getInputVertices();
if (!(outVertices == null || outVertices.isEmpty() || (outVertices
- .size() == 1 && outVertices.values().iterator().next()
+ .size() == 1 && outVertices.values().iterator().next().getEdgeProperty()
.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER))) {
// more than 1 output OR single output is not bipartite
isMRR = false;
break;
}
if (!(inVertices == null || inVertices.isEmpty() || (inVertices
- .size() == 1 && inVertices.values().iterator().next()
+ .size() == 1 && inVertices.values().iterator().next().getEdgeProperty()
.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER))) {
// more than 1 output OR single output is not bipartite
isMRR = false;
@@ -924,24 +934,28 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
VertexPlan vertexPlan = vertex.getVertexPlan();
- Map<Vertex, EdgeProperty> inVertices =
- new HashMap<Vertex, EdgeProperty>();
+ Map<Vertex, Edge> inVertices =
+ new HashMap<Vertex, Edge>();
- Map<Vertex, EdgeProperty> outVertices =
- new HashMap<Vertex, EdgeProperty>();
+ Map<Vertex, Edge> outVertices =
+ new HashMap<Vertex, Edge>();
for(String inEdgeId : vertexPlan.getInEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(inEdgeId);
Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
- EdgeProperty edgeProp = dag.edges.get(inEdgeId);
- inVertices.put(inVertex, edgeProp);
+ Edge edge = dag.edges.get(inEdgeId);
+ edge.setSourceVertex(inVertex);
+ edge.setDestinationVertex(vertex);
+ inVertices.put(inVertex, edge);
}
for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
EdgePlan edgePlan = edgePlans.get(outEdgeId);
Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
- EdgeProperty edgeProp = dag.edges.get(outEdgeId);
- outVertices.put(outVertex, edgeProp);
+ Edge edge = dag.edges.get(outEdgeId);
+ edge.setSourceVertex(vertex);
+ edge.setDestinationVertex(outVertex);
+ outVertices.put(outVertex, edge);
}
vertex.setInputVertices(inVertices);
@@ -1008,7 +1022,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
vertexMap.put(v.getName(), v);
}
- Vertex getVertex(String vertexName) {
+ @Override
+ public Vertex getVertex(String vertexName) {
return vertexMap.get(vertexName);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
new file mode 100644
index 0000000..5a8d4f6
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -0,0 +1,214 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+
+public class Edge {
+
+ private EdgeProperty edgeProperty;
+ private EdgeManager edgeManager;
+ @SuppressWarnings("rawtypes")
+ private EventHandler eventHandler;
+ private AtomicBoolean bufferEvents = new AtomicBoolean(false);
+ private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
+ private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
+ private Vertex sourceVertex;
+ private Vertex destinationVertex; // this may end up being a list for shared edge
+
+ @SuppressWarnings("rawtypes")
+ public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
+ this.edgeProperty = edgeProperty;
+ this.eventHandler = eventHandler;
+ switch (edgeProperty.getDataMovementType()) {
+ case ONE_TO_ONE:
+ edgeManager = new OneToOneEdgeManager();
+ break;
+ case BROADCAST:
+ edgeManager = new BroadcastEdgeManager();
+ break;
+ case SCATTER_GATHER:
+ edgeManager = new ScatterGatherEdgeManager();
+ break;
+ default:
+ String message = "Unknown edge data movement type: "
+ + edgeProperty.getDataMovementType();
+ throw new TezUncheckedException(message);
+ }
+ }
+
+ public EdgeProperty getEdgeProperty() {
+ return this.edgeProperty;
+ }
+
+ public EdgeManager getEdgeManager() {
+ return this.edgeManager;
+ }
+
+ public void setEdgeManager(EdgeManager edgeManager) {
+ if(edgeManager == null) {
+ throw new TezUncheckedException("Edge manager cannot be null");
+ }
+ this.edgeManager = edgeManager;
+ }
+
+ public void setSourceVertex(Vertex sourceVertex) {
+ if (this.sourceVertex != null && this.sourceVertex != sourceVertex) {
+ throw new TezUncheckedException("Source vertex exists: "
+ + sourceVertex.getName());
+ }
+ this.sourceVertex = sourceVertex;
+ }
+
+ public void setDestinationVertex(Vertex destinationVertex) {
+ if (this.destinationVertex != null && this.destinationVertex != destinationVertex) {
+ throw new TezUncheckedException("Destination vertex exists: "
+ + destinationVertex.getName());
+ }
+ this.destinationVertex = destinationVertex;
+ }
+
+ public InputSpec getDestinationSpec(int destinationTaskIndex) {
+ return new InputSpec(sourceVertex.getName(),
+ edgeProperty.getEdgeDestination(),
+ edgeManager.getNumDestinationTaskInputs(sourceVertex, destinationTaskIndex));
+ }
+
+ public OutputSpec getSourceSpec(int sourceTaskIndex) {
+ return new OutputSpec(destinationVertex.getName(),
+ edgeProperty.getEdgeSource(),
+ edgeManager.getNumSourceTaskOutputs(destinationVertex, sourceTaskIndex));
+ }
+
+ public void startEventBuffering() {
+ bufferEvents.set(true);
+ }
+
+ public void stopEventBuffering() {
+ // assume only 1 entity will start and stop event buffering
+ bufferEvents.set(false);
+ for(TezEvent event : destinationEventBuffer) {
+ sendTezEventToDestinationTasks(event);
+ }
+ destinationEventBuffer.clear();
+ for(TezEvent event : sourceEventBuffer) {
+ sendTezEventToSourceTasks(event);
+ }
+ sourceEventBuffer.clear();
+ }
+
+ public void sendTezEventToSourceTasks(TezEvent tezEvent) {
+ if (bufferEvents.get()) {
+ switch (tezEvent.getEventType()) {
+ case INPUT_READ_ERROR_EVENT:
+ InputReadErrorEvent event = (InputReadErrorEvent) tezEvent.getEvent();
+ TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+ int destTaskIndex = destAttemptId.getTaskID().getId();
+ int srcTaskIndex = edgeManager.routeEventToSourceTasks(destTaskIndex, event);
+ // TODO this is BROKEN. TEZ-431
+// TezTaskID srcTaskId = sourceVertex.getTask(srcTaskIndex).getTaskId();
+// sendEventToTask(srcTaskId, tezEvent);
+ break;
+ default:
+ throw new TezUncheckedException("Unhandled tez event type: "
+ + tezEvent.getEventType());
+ }
+ } else {
+ sourceEventBuffer.add(tezEvent);
+ }
+ }
+
+ public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
+ if (bufferEvents.get()) {
+ List<Integer> destTaskIndices = new ArrayList<Integer>();
+ switch (tezEvent.getEventType()) {
+ case DATA_MOVEMENT_EVENT:
+ DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
+ TezTaskAttemptID dmSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+ int dmSourceTaskIndex = dmSourceAttemptId.getTaskID().getId();
+ edgeManager.routeEventToDestinationTasks(dmEvent, dmSourceTaskIndex,
+ destinationVertex.getTotalTasks(), destTaskIndices);
+ for(Integer destTaskIndex : destTaskIndices) {
+ EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT,
+ destinationVertex.getName(),
+ sourceVertex.getName(),
+ null); // will be filled by Task when sending the event. Is it needed?
+ destMeta.setIndex(dmEvent.getTargetIndex());
+ tezEvent.setDestinationInfo(destMeta);
+ TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
+ sendEventToTask(destTaskId, tezEvent);
+ }
+ break;
+ case INPUT_FAILED_EVENT:
+ InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
+ TezTaskAttemptID ifSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+ int ifSourceTaskIndex = ifSourceAttemptId.getTaskID().getId();
+ edgeManager.routeEventToDestinationTasks(ifEvent, ifSourceTaskIndex,
+ destinationVertex.getTotalTasks(), destTaskIndices);
+ for(Integer destTaskIndex : destTaskIndices) {
+ EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT,
+ destinationVertex.getName(),
+ sourceVertex.getName(),
+ null); // will be filled by Task when sending the event. Is it needed?
+ destMeta.setIndex(ifEvent.getTargetIndex());
+ tezEvent.setDestinationInfo(destMeta);
+ TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
+ sendEventToTask(destTaskId, tezEvent);
+ }
+ default:
+ throw new TezUncheckedException("Unhandled tez event type: "
+ + tezEvent.getEventType());
+ }
+ } else {
+ destinationEventBuffer.add(tezEvent);
+ }
+ }
+
+ private void sendEventToDestination(List<Integer> destTaskIndeces, TezEvent tezEvent) {
+ for(Integer destTaskIndex : destTaskIndeces) {
+ TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
+ sendEventToTask(destTaskId, tezEvent);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
+ eventHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
new file mode 100644
index 0000000..7c4743e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -0,0 +1,66 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public class OneToOneEdgeManager extends EdgeManager {
+
+ @Override
+ public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ int destinationTaskIndex) {
+ return 1;
+ }
+
+ @Override
+ public int getNumSourceTaskOutputs(Vertex destinationVertex,
+ int sourceTaskIndex) {
+ return 1;
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ event.setTargetIndex(0);
+ addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(InputFailedEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ event.setTargetIndex(0);
+ addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+ }
+
+ @Override
+ public int routeEventToSourceTasks(int destinationTaskIndex,
+ InputReadErrorEvent event) {
+ return destinationTaskIndex;
+ }
+
+ void addDestinationTaskIndex(int sourceTaskIndex, List<Integer> taskIndeces) {
+ taskIndeces.add(new Integer(sourceTaskIndex));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
new file mode 100644
index 0000000..380b6b6
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -0,0 +1,77 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public class ScatterGatherEdgeManager extends EdgeManager {
+
+ private int initialDestinationTaskNumber = -1;
+
+ @Override
+ public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ int destinationTaskIndex) {
+ return sourceVertex.getTotalTasks();
+ }
+
+ @Override
+ public int getNumSourceTaskOutputs(Vertex destinationVertex,
+ int sourceTaskIndex) {
+ if(initialDestinationTaskNumber == -1) {
+ // the downstream vertex may not have started and so its number of tasks
+ // may change. So save this initial count and provide a consistent view
+ // to all source tasks, including late starters and retries.
+ // When the number of destination tasks change then the routing will have
+ // to be updated too.
+ // This value may be obtained from config too if destination task initial
+ // parallelism is not specified.
+ initialDestinationTaskNumber = destinationVertex.getTotalTasks();
+ }
+ return initialDestinationTaskNumber;
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ int destinationTaskIndex = event.getSourceIndex();
+ event.setTargetIndex(sourceTaskIndex);
+ taskIndices.add(new Integer(destinationTaskIndex));
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(InputFailedEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ int destinationTaskIndex = event.getSourceIndex();
+ event.setTargetIndex(sourceTaskIndex);
+ taskIndices.add(new Integer(destinationTaskIndex));
+ }
+
+ @Override
+ public int routeEventToSourceTasks(int destinationTaskIndex,
+ InputReadErrorEvent event) {
+ return event.getIndex();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index bb6e2ee..b854a43 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -30,15 +30,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexScheduler;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -72,9 +75,10 @@ public class ShuffleVertexManager implements VertexScheduler {
public ShuffleVertexManager(Vertex managedVertex) {
this.managedVertex = managedVertex;
- Map<Vertex, EdgeProperty> inputs = managedVertex.getInputVertices();
- for(Map.Entry<Vertex, EdgeProperty> entry : inputs.entrySet()) {
- if(entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
+ for(Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
+ if (entry.getValue().getEdgeProperty().getDataMovementType() ==
+ DataMovementType.SCATTER_GATHER) {
Vertex vertex = entry.getKey();
bipartiteSources.put(vertex.getVertexId(), vertex);
}
@@ -86,6 +90,82 @@ public class ShuffleVertexManager implements VertexScheduler {
// dynamically changed as the DAG progresses.
}
+
+ public class CustomEdgeManager extends EdgeManager {
+ int numSourceTaskOutputs;
+ int numDestinationTasks;
+ int basePartitionRange;
+ int remainderRangeForLastShuffler;
+
+ CustomEdgeManager(int numSourceTaskOutputs, int numDestinationTasks,
+ int basePartitionRange, int remainderPartitionForLastShuffler) {
+ this.numSourceTaskOutputs = numSourceTaskOutputs;
+ this.numDestinationTasks = numDestinationTasks;
+ this.basePartitionRange = basePartitionRange;
+ this.remainderRangeForLastShuffler = remainderPartitionForLastShuffler;
+ }
+
+ @Override
+ public int getNumDestinationTaskInputs(Vertex sourceVertex,
+ int destinationTaskIndex) {
+ int partitionRange = 1;
+ if(destinationTaskIndex < numDestinationTasks-1) {
+ partitionRange = basePartitionRange;
+ } else {
+ partitionRange = remainderRangeForLastShuffler;
+ }
+ return sourceVertex.getTotalTasks() * partitionRange;
+ }
+
+ @Override
+ public int getNumSourceTaskOutputs(Vertex destinationVertex,
+ int sourceTaskIndex) {
+ return numSourceTaskOutputs;
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ int sourceIndex = event.getSourceIndex();
+ int destinationTaskIndex = sourceIndex/basePartitionRange;
+
+ // all inputs from a source task are next to each other in original order
+ int targetIndex =
+ sourceTaskIndex * basePartitionRange
+ + sourceIndex % basePartitionRange;
+
+ event.setTargetIndex(targetIndex);
+ taskIndices.add(new Integer(destinationTaskIndex));
+ }
+
+ @Override
+ public void routeEventToDestinationTasks(InputFailedEvent event,
+ int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ int sourceIndex = event.getSourceIndex();
+ int destinationTaskIndex = sourceIndex/basePartitionRange;
+
+ int targetIndex =
+ sourceTaskIndex * basePartitionRange
+ + sourceIndex % basePartitionRange;
+
+ event.setTargetIndex(targetIndex);
+ taskIndices.add(new Integer(destinationTaskIndex));
+ }
+
+ @Override
+ public int routeEventToSourceTasks(int destinationTaskIndex,
+ InputReadErrorEvent event) {
+ int partitionRange = 1;
+ if(destinationTaskIndex < numDestinationTasks-1) {
+ partitionRange = basePartitionRange;
+ } else {
+ partitionRange = remainderRangeForLastShuffler;
+ }
+ return event.getIndex()/partitionRange;
+ }
+ }
+
+
@Override
public void onVertexStarted() {
pendingTasks = new ArrayList<TezTaskID>(managedVertex.getTotalTasks());
@@ -140,7 +220,7 @@ public class ShuffleVertexManager implements VertexScheduler {
}
numSourceTasks = numSrcTasks;
}
-
+
void determineParallelismAndApply() {
if(numSourceTasksCompleted == 0) {
return;
@@ -199,7 +279,15 @@ public class ShuffleVertexManager implements VertexScheduler {
throw new TezUncheckedException(e);
}
- managedVertex.setParallelism(finalTaskParallelism, taskConfs);
+ Map<Vertex, EdgeManager> edgeManagers = new HashMap<Vertex, EdgeManager>(
+ bipartiteSources.size());
+ for(Vertex vertex : bipartiteSources.values()) {
+ edgeManagers.put(vertex, new CustomEdgeManager(currentParallelism,
+ finalTaskParallelism, basePartitionRange,
+ remainderRangeForLastShuffler));
+ }
+
+ managedVertex.setParallelism(finalTaskParallelism, edgeManagers);
updatePendingTasks();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 00ef9e5..30bb1eb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -297,9 +297,10 @@ public class TaskAttemptImpl implements TaskAttempt,
Vertex vertex = getVertex();
ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
DAG dag = vertex.getDAG();
+ int taskId = getTaskID().getId();
return new TaskSpec(getID(), dag.getUserName(),
- vertex.getName(), procDesc, vertex.getInputSpecList(),
- vertex.getOutputSpecList());
+ vertex.getName(), procDesc, vertex.getInputSpecList(taskId),
+ vertex.getOutputSpecList(taskId));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index a43453c..b66760c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -18,9 +18,11 @@
package org.apache.tez.dag.app.dag.impl;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -60,6 +62,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
@@ -71,6 +74,7 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -102,6 +106,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
protected boolean encryptedShuffle;
protected TaskLocationHint locationHint;
+
+ private List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
+ private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
+ new ArrayList(0);
+
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
@@ -113,6 +122,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
private static final SingleArcTransition<TaskImpl, TaskEvent>
KILL_TRANSITION = new KillTransition();
+ private static final SingleArcTransition<TaskImpl, TaskEvent>
+ ADD_TEZ_EVENT_TRANSITION = new AddTezEventTransition();
private static final StateMachineFactory
<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
@@ -128,6 +139,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_TERMINATE,
new KillNewTransition())
+ .addTransition(TaskStateInternal.NEW, TaskStateInternal.NEW,
+ TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
@@ -142,6 +156,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition())
+ .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
+ TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
// Transitions from RUNNING state
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
@@ -168,12 +184,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
TaskEventType.T_TERMINATE,
KILL_TRANSITION)
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+
// Transitions from KILL_WAIT state
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED,
new KillWaitAttemptKilledTransition())
+ .addTransition(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILL_WAIT,
+ TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
// Ignore-able transitions.
.addTransition(
TaskStateInternal.KILL_WAIT,
@@ -196,6 +217,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
+ .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+ TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
// Ignore-able transitions.
.addTransition(
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
@@ -204,12 +227,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+ TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+ .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(
TaskEventType.T_TERMINATE,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from KILLED state
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
+ TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+ .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
EnumSet.of(
TaskEventType.T_TERMINATE,
TaskEventType.T_ADD_SPEC_ATTEMPT))
@@ -415,6 +442,46 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
readLock.unlock();
}
}
+
+ @Override
+ public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+ int fromEventId, int maxEvents) {
+ List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
+ readLock.lock();
+ try {
+ if (tezEventsForTaskAttempts.size() > fromEventId) {
+ int actualMax = Math.min(maxEvents,
+ (tezEventsForTaskAttempts.size() - fromEventId));
+ events = Collections.unmodifiableList(tezEventsForTaskAttempts.subList(
+ fromEventId, actualMax + fromEventId));
+ // currently not modifying the events so that we dont have to create
+ // copies of events. e.g. if we have to set taskAttemptId into the TezEvent
+ // destination metadata then we will need to create a copy of the TezEvent
+ // and then modify the metadata and then send the copy on the RPC. This
+ // is important because TezEvents are only routed in the AM and not copied
+ // during routing. So e.g. a broadcast edge will send the same event to
+ // all consumers (like it should). If copies were created then re-routing
+ // the events on parallelism changes would be difficult. We would have to
+ // buffer the events in the Vertex until the parallelism was set and then
+ // route the events.
+ }
+ return events;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<TezEvent> getAndClearTaskTezEvents() {
+ readLock.lock();
+ try {
+ List<TezEvent> events = tezEventsForTaskAttempts;
+ tezEventsForTaskAttempts = new ArrayList<TezEvent>();
+ return events;
+ } finally {
+ readLock.unlock();
+ }
+ }
@VisibleForTesting
public TaskStateInternal getInternalState() {
@@ -1078,6 +1145,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
logMsg));
}
}
+
+ private static class AddTezEventTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ TaskEventAddTezEvent addEvent = (TaskEventAddTezEvent) event;
+ task.tezEventsForTaskAttempts.add(addEvent.getTezEvent());
+ }
+ }
private static class KillTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a040ff2..1186caa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -23,11 +23,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -52,7 +54,6 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -71,6 +72,7 @@ import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
@@ -87,6 +89,7 @@ import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
@@ -101,12 +104,15 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
import org.apache.tez.engine.newapi.impl.InputSpec;
import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
@@ -177,6 +183,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private static final InternalErrorTransition
INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final RouteEventTransition
+ ROUTE_EVENT_TRANSITION = new RouteEventTransition();
private static final TaskAttemptCompletedEventTransition
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
@@ -244,6 +252,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.RUNNING,
VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ .addTransition(
+ VertexState.RUNNING,
+ VertexState.RUNNING, VertexEventType.V_ROUTE_EVENT,
+ ROUTE_EVENT_TRANSITION)
// Transitions from TERMINATING state.
.addTransition
@@ -345,8 +357,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// For committer
private final VertexContext vertexContext;
- private Map<Vertex, EdgeProperty> sourceVertices;
- private Map<Vertex, EdgeProperty> targetVertices;
+ @VisibleForTesting
+ Map<Vertex, Edge> sourceVertices;
+ private Map<Vertex, Edge> targetVertices;
private VertexScheduler vertexScheduler;
@@ -457,6 +470,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
+ public Task getTask(int taskIndex) {
+ readLock.lock();
+ try {
+ // does it matter to create a duplicate list for efficiency
+ // instead of traversing the map
+ // local assign to LinkedHashMap to ensure that sequential traversal
+ // assumption is satisfied
+ LinkedHashMap<TezTaskID, Task> taskList = tasks;
+ int i=0;
+ for(Map.Entry<TezTaskID, Task> entry : taskList.entrySet()) {
+ if(taskIndex == i) {
+ return entry.getValue();
+ }
+ }
+ return null;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
public int getTotalTasks() {
return numTasks;
}
@@ -658,12 +692,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
- public void setParallelism(int parallelism, List<byte[]> taskUserPayloads) {
+ public void setParallelism(int parallelism,
+ Map<Vertex, EdgeManager> sourceEdgeManagers) {
writeLock.lock();
try {
- Preconditions.checkArgument(
- taskUserPayloads == null || taskUserPayloads.size() == parallelism,
- "Userpayload must be set for all tasks or set to null");
if (parallelism >= numTasks) {
// not that hard to support perhaps. but checking right now since there
// is no use case for it and checking may catch other bugs.
@@ -674,7 +706,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Ingoring setParallelism to current value: " + parallelism);
return;
}
-
+
+ // start buffering incoming events so that we can re-route existing events
+ for (Edge edge : sourceVertices.values()) {
+ edge.startEventBuffering();
+ }
+
+ // Use a set since the same event may have been sent to multiple tasks
+ // and we want to avoid duplicates
+ Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
+
LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism);
// assign to local variable of LinkedHashMap to make sure that changing
// type of task causes compile error. We depend on LinkedHashMap for order
@@ -691,6 +732,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
"All tasks must be in initial state when changing parallelism"
+ " for vertex: " + getVertexId() + " name: " + getName());
}
+ pendingEvents.addAll(task.getAndClearTaskTezEvents());
if (i <= parallelism) {
continue;
}
@@ -698,13 +740,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
iter.remove();
}
this.numTasks = parallelism;
- if (taskUserPayloads != null) {
- this.taskUserPayloads = new ArrayList<byte[]>(taskUserPayloads);
- }
assert tasks.size() == numTasks;
+
+ // set new edge managers
+ if(sourceEdgeManagers != null) {
+ for(Map.Entry<Vertex, EdgeManager> entry : sourceEdgeManagers.entrySet()) {
+ Vertex sourceVertex = entry.getKey();
+ EdgeManager edgeManager = entry.getValue();
+ Edge edge = sourceVertices.get(sourceVertex);
+ LOG.info("Replacing edge manager for source:"
+ + sourceVertex.getVertexId() + " destination: " + getVertexId());
+ edge.setEdgeManager(edgeManager);
+ }
+ }
+
+ // Re-route all existing TezEvents according to new routing table
+ // At this point only events attributed to source task attempts can be
+ // re-routed. e.g. DataMovement or InputFailed events.
+ // This assumption is fine for now since these tasks haven't been started.
+ // So they can only get events generated from source task attempts that
+ // have already been started.
+ DAG dag = getDAG();
+ for(TezEvent event : pendingEvents) {
+ TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
+ .getTaskID().getVertexID();
+ Vertex sourceVertex = dag.getVertex(sourceVertexId);
+ Edge sourceEdge = sourceVertices.get(sourceVertex);
+ sourceEdge.sendTezEventToDestinationTasks(event);
+ }
+
+ // stop buffering events
+ for (Edge edge : sourceVertices.values()) {
+ edge.stopEventBuffering();
+ }
+
} finally {
writeLock.unlock();
}
+
}
@Override
@@ -957,10 +1030,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// create the Tasks but don't start them yet
createTasks(vertex);
+ // TODO get this from API
boolean hasBipartite = false;
if (vertex.sourceVertices != null) {
- for (EdgeProperty edgeProperty : vertex.sourceVertices.values()) {
- if (edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ for (Edge edge : vertex.sourceVertices.values()) {
+ if (edge.getEdgeProperty().getDataMovementType() ==
+ DataMovementType.SCATTER_GATHER) {
hasBipartite = true;
break;
}
@@ -1344,6 +1419,55 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
diagnostics.add(diag);
}
+ private static class RouteEventTransition implements
+ SingleArcTransition<VertexImpl, VertexEvent> {
+ @Override
+ public void transition(VertexImpl vertex, VertexEvent event) {
+ VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
+ List<TezEvent> tezEvents = rEvent.getEvents();
+ for(TezEvent tezEvent : tezEvents) {
+ switch(tezEvent.getEventType()) {
+ case DATA_MOVEMENT_EVENT:
+ {
+ EventMetaData sourceMeta = tezEvent.getSourceInfo();
+ TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+ DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
+ dmEvent.setVersion(srcTaId.getId());
+ assert sourceMeta.getTaskVertexName().equals(vertex.getName());
+ Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
+ sourceMeta.getEdgeVertexName()));
+ destEdge.sendTezEventToDestinationTasks(tezEvent);
+ }
+ break;
+ case INPUT_FAILED_EVENT:
+ {
+ EventMetaData sourceMeta = tezEvent.getSourceInfo();
+ TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+ InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
+ ifEvent.setVersion(srcTaId.getId());
+ assert sourceMeta.getTaskVertexName().equals(vertex.getName());
+ Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
+ sourceMeta.getEdgeVertexName()));
+ destEdge.sendTezEventToDestinationTasks(tezEvent);
+ }
+ break;
+ case INPUT_READ_ERROR_EVENT:
+ {
+ EventMetaData sourceMeta = tezEvent.getSourceInfo();
+ assert sourceMeta.getTaskVertexName().equals(vertex.getName());
+ Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+ sourceMeta.getEdgeVertexName()));
+ srcEdge.sendTezEventToSourceTasks(tezEvent);
+ }
+ break;
+ default:
+ throw new TezUncheckedException("Unhandled tez event type: "
+ + tezEvent.getEventType());
+ }
+ }
+ }
+ }
+
private static class InternalErrorTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
@Override
@@ -1359,12 +1483,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
- public void setInputVertices(Map<Vertex, EdgeProperty> inVertices) {
+ public void setInputVertices(Map<Vertex, Edge> inVertices) {
this.sourceVertices = inVertices;
}
@Override
- public void setOutputVertices(Map<Vertex, EdgeProperty> outVertices) {
+ public void setOutputVertices(Map<Vertex, Edge> outVertices) {
this.targetVertices = outVertices;
}
@@ -1395,12 +1519,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
- public Map<Vertex, EdgeProperty> getInputVertices() {
+ public Map<Vertex, Edge> getInputVertices() {
return Collections.unmodifiableMap(this.sourceVertices);
}
@Override
- public Map<Vertex, EdgeProperty> getOutputVertices() {
+ public Map<Vertex, Edge> getOutputVertices() {
return Collections.unmodifiableMap(this.targetVertices);
}
@@ -1448,12 +1572,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// TODO Eventually remove synchronization.
@Override
- public synchronized List<InputSpec> getInputSpecList() {
+ public synchronized List<InputSpec> getInputSpecList(int taskIndex) {
inputSpecList = new ArrayList<InputSpec>(
this.getInputVerticesCount());
- for (Entry<Vertex, EdgeProperty> entry : this.getInputVertices().entrySet()) {
- InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
- entry.getValue().getEdgeDestination(), entry.getKey().getTotalTasks());
+ for (Entry<Vertex, Edge> entry : this.getInputVertices().entrySet()) {
+ InputSpec inputSpec = entry.getValue().getDestinationSpec(taskIndex);
if (LOG.isDebugEnabled()) {
LOG.debug("For vertex : " + this.getName()
+ ", Using InputSpec : " + inputSpec);
@@ -1466,16 +1589,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// TODO Eventually remove synchronization.
@Override
- public synchronized List<OutputSpec> getOutputSpecList() {
+ public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) {
if (this.outputSpecList == null) {
outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount());
- for (Entry<Vertex, EdgeProperty> entry : this.getOutputVertices().entrySet()) {
- OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
- entry.getValue().getEdgeSource(), entry.getKey().getTotalTasks());
- if (LOG.isDebugEnabled()) {
- LOG.debug("For vertex : " + this.getName()
- + ", Using OutputSpec : " + outputSpec);
- }
+ for (Entry<Vertex, Edge> entry : this.getOutputVertices().entrySet()) {
+ OutputSpec outputSpec = entry.getValue().getSourceSpec(taskIndex);
// TODO DAGAM This should be based on the edge type.
outputSpecList.add(outputSpec);
}