You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/18 07:03:52 UTC
git commit: TEZ-801. Support routing of event to multiple destination
physical inputs (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 4fd601e59 -> 06bc6eabe
TEZ-801. Support routing of event to multiple destination physical inputs (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/06bc6eab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/06bc6eab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/06bc6eab
Branch: refs/heads/master
Commit: 06bc6eabe48e26e0195864b7672adf2cab9311aa
Parents: 4fd601e
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Feb 17 22:03:41 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Feb 17 22:03:41 2014 -0800
----------------------------------------------------------------------
.../org/apache/tez/dag/api/EdgeManager.java | 75 ++++++++------
.../runtime/api/events/InputFailedEvent.java | 10 +-
.../dag/app/dag/impl/BroadcastEdgeManager.java | 33 +++---
.../org/apache/tez/dag/app/dag/impl/Edge.java | 103 +++++++++++++------
.../tez/dag/app/dag/impl/NullEdgeManager.java | 19 ++--
.../dag/app/dag/impl/OneToOneEdgeManager.java | 38 +++----
.../app/dag/impl/ScatterGatherEdgeManager.java | 32 +++---
.../org/apache/tez/test/EdgeManagerForTest.java | 21 ++--
.../vertexmanager/ShuffleVertexManager.java | 65 +++++++-----
.../vertexmanager/TestShuffleVertexManager.java | 31 ++++--
10 files changed, 268 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
index d855f6c..e7beec1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
@@ -19,9 +19,10 @@
package org.apache.tez.dag.api;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
/**
@@ -31,6 +32,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
*
* Implementations must provide a 0 argument public constructor.
*/
+@Evolving
public interface EdgeManager {
/**
@@ -49,63 +51,78 @@ public interface EdgeManager {
public void initialize(EdgeManagerContext edgeManagerContext);
/**
- * Get the number of inputs on the destination task
+ * Get the number of physical inputs on the destination task
* @param numSourceTasks Total number of source tasks
* @param destinationTaskIndex Index of destination task for which number of
* inputs is needed
- * @return Number of inputs on the destination task
+ * @return Number of physical inputs on the destination task
*/
- public int getNumDestinationTaskInputs(int numSourceTasks,
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
int destinationTaskIndex);
/**
- * Get the number of outputs on the source task
+ * Get the number of physical outputs on the source task
* @param numDestinationTasks Total number of destination tasks
* @param sourceTaskIndex Index of the source task for which number of outputs
* is needed
- * @return Number of outputs on the source task
+ * @return Number of physical outputs on the source task
*/
- public int getNumSourceTaskOutputs(int numDestinationTasks,
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
int sourceTaskIndex);
/**
- * Return the destination task indeces that need to be sent an input available
- * event because the source task output is now available
- * @param event Data movement event
- * @param sourceTaskIndex Source task
- * @param numDestinationTasks Total number of destination tasks
- * @param taskIndices List into which the destination task indices is to be
- * returned
+ * Return the routing information to inform consumers about the source task
+ * output that is now available. The return Map has the routing information.
+ * Key is the destination task physical input index and the value is the list
+ * of destination task indices for which the key input index will receive the
+ * data movement event.
+ * @param event
+ * Data movement event
+ * @param sourceTaskIndex
+ * Source task
+ * @param numDestinationTasks
+ * Total number of destination tasks
+ * @param inputIndicesToTaskIndices
+ * Map via which the routing information is returned
*/
- public void routeEventToDestinationTasks(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices);
/**
- * Return the destination task indeces that need to be sent an input failed
- * event because the source task output is no longer available
- * @param event Input failed event
- * @param sourceTaskIndex Failed source task
- * @param numDestinationTasks Total number of destination tasks
- * @param taskIndices List into which the destination task indices is to be
- * returned
+ * Return the routing information to inform consumers about the failure of a
+ * source task whose outputs have been potentially lost. The return Map has
+ * the routing information. Key is the destination task physical input index
+ * and the value is the list of destination task indices for which the key
+ * input index will receive the input failure notification. This method will
+ * be called once for every source task failure and information for all
+ * affected destinations must be provided in that invocation.
+ *
+ * @param sourceTaskIndex
+ * Source task
+ * @param numDestinationTasks
+ * Total number of destination tasks
+ * @param inputIndicesToTaskIndices
+ * Map via which the routing information is returned
*/
- public void routeEventToDestinationTasks(InputFailedEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices);
/**
* Get the number of destination tasks that consume data from the source task
* @param sourceTaskIndex Source task index
* @param numDestinationTasks Total number of destination tasks
*/
- public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks);
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks);
/**
* Return the source task index to which to send the input error event
- * @param destinationTaskIndex Destination task that reported the error
* @param event Input read error event. Has more information about the error
+ * @param destinationTaskIndex Destination task that reported the error
* @return Index of the source task that created the unavailable input
*/
- public int routeEventToSourceTasks(int destinationTaskIndex,
- InputReadErrorEvent event);
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
index fb44462..a4648f3 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
@@ -46,11 +46,11 @@ public class InputFailedEvent extends Event{
*/
private int version;
- @Private
+ @Private // for Writable
public InputFailedEvent() {
}
- @Private // for Writable
+ @Private
public InputFailedEvent(int sourceIndex,
int targetIndex,
int version) {
@@ -58,6 +58,12 @@ public class InputFailedEvent extends Event{
this.targetIndex = targetIndex;
this.version = version;
}
+
+ @Private
+ public InputFailedEvent(int targetIndex, int version) {
+ this.targetIndex = targetIndex;
+ this.version = version;
+ }
public int getSourceIndex() {
return sourceIndex;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 5e5314e..305e085 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -19,12 +19,14 @@
package org.apache.tez.dag.app.dag.impl;
import java.util.List;
+import java.util.Map;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
+
+import com.google.common.collect.Lists;
public class BroadcastEdgeManager implements EdgeManager {
@@ -34,34 +36,37 @@ public class BroadcastEdgeManager implements EdgeManager {
}
@Override
- public int getNumDestinationTaskInputs(int numSourceTasks,
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
int destinationTaskIndex) {
return numSourceTasks;
}
@Override
- public int getNumSourceTaskOutputs(int numDestinationTasks,
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
int sourceTaskIndex) {
return 1;
}
@Override
- public void routeEventToDestinationTasks(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
- event.setTargetIndex(sourceTaskIndex);
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+ List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+ inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
}
@Override
- public void routeEventToDestinationTasks(InputFailedEvent event,
- int sourceTaskIndex, int numDestinationTasks , List<Integer> taskIndices) {
- event.setTargetIndex(sourceTaskIndex);
- addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+ List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
+ addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+ inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
}
-
+
@Override
- public int routeEventToSourceTasks(int destinationTaskIndex,
- InputReadErrorEvent event) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex) {
return event.getIndex();
}
@@ -72,7 +77,7 @@ public class BroadcastEdgeManager implements EdgeManager {
}
@Override
- public int getDestinationConsumerTaskNumber(int sourceTaskIndex,
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex,
int numDestTasks) {
return numDestTasks;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 288a417..55ab86f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -46,6 +47,8 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import com.google.common.collect.Maps;
+
public class Edge {
static class EdgeManagerContextImpl implements EdgeManagerContext {
@@ -163,13 +166,13 @@ public class Edge {
public InputSpec getDestinationSpec(int destinationTaskIndex) {
return new InputSpec(sourceVertex.getName(),
edgeProperty.getEdgeDestination(),
- edgeManager.getNumDestinationTaskInputs(sourceVertex.getTotalTasks(),
+ edgeManager.getNumDestinationTaskPhysicalInputs(sourceVertex.getTotalTasks(),
destinationTaskIndex));
}
public OutputSpec getSourceSpec(int sourceTaskIndex) {
return new OutputSpec(destinationVertex.getName(),
- edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskOutputs(
+ edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskPhysicalOutputs(
destinationVertex.getTotalTasks(), sourceTaskIndex));
}
@@ -199,9 +202,9 @@ public class Edge {
TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo()
.getTaskAttemptID();
int destTaskIndex = destAttemptId.getTaskID().getId();
- int srcTaskIndex = edgeManager.routeEventToSourceTasks(destTaskIndex,
- event);
- int numConsumers = edgeManager.getDestinationConsumerTaskNumber(
+ int srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
+ destTaskIndex);
+ int numConsumers = edgeManager.getNumDestinationConsumerTasks(
srcTaskIndex, destinationVertex.getTotalTasks());
Task srcTask = sourceVertex.getTask(srcTaskIndex);
if (srcTask == null) {
@@ -239,9 +242,52 @@ public class Edge {
}
}
+ void sendDmEventOrIfEventToTasks(TezEvent tezEvent, int srcTaskIndex,
+ boolean isDataMovementEvent,
+ Map<Integer, List<Integer>> ifInputIndicesToTaskIndices) {
+ int num = 0;
+ Event event = tezEvent.getEvent();
+ for (Map.Entry<Integer, List<Integer>> entry : ifInputIndicesToTaskIndices.entrySet()) {
+ ++num;
+ TezEvent tezEventToSend = null;
+ if (num == ifInputIndicesToTaskIndices.size()) {
+ if (isDataMovementEvent) {
+ ((DataMovementEvent) event).setTargetIndex(entry.getKey().intValue());
+ } else {
+ ((InputFailedEvent) event).setTargetIndex(entry.getKey().intValue());
+ }
+ tezEventToSend = tezEvent;
+ } else {
+ Event e;
+ if (isDataMovementEvent) {
+ DataMovementEvent dmEvent = (DataMovementEvent) event;
+ e = new DataMovementEvent(dmEvent.getSourceIndex(),
+ entry.getKey().intValue(), dmEvent.getVersion(), dmEvent.getUserPayload());
+ } else {
+ InputFailedEvent ifEvent = ((InputFailedEvent) event);
+ e = new InputFailedEvent(entry.getKey().intValue(), ifEvent.getVersion());
+ }
+ tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ }
+ tezEventToSend.setDestinationInfo(destinationMetaInfo);
+ for(Integer destTaskIndex : entry.getValue()) {
+ Task destTask = destinationVertex.getTask(destTaskIndex);
+ if (destTask == null) {
+ throw new TezUncheckedException("Unexpected null task." +
+ " sourceVertex=" + sourceVertex.getVertexId() +
+ " srcIndex = " + srcTaskIndex +
+ " destAttemptId=" + destinationVertex.getVertexId() +
+ " destIndex=" + destTaskIndex +
+ " edgeManager=" + edgeManager.getClass().getName());
+ }
+ TezTaskID destTaskId = destTask.getTaskId();
+ sendEventToTask(destTaskId, tezEventToSend);
+ }
+ }
+ }
+
public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
if (!bufferEvents.get()) {
- List<Integer> destTaskIndices = new ArrayList<Integer>();
boolean isDataMovementEvent = true;
switch (tezEvent.getEventType()) {
case COMPOSITE_DATA_MOVEMENT_EVENT:
@@ -249,33 +295,32 @@ public class Edge {
break;
case INPUT_FAILED_EVENT:
isDataMovementEvent = false;
+ // fall through
case DATA_MOVEMENT_EVENT:
- Event event = tezEvent.getEvent();
- TezTaskAttemptID sourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
- int sourceTaskIndex = sourceAttemptId.getTaskID().getId();
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices = Maps
+ .newHashMap();
+ TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo()
+ .getTaskAttemptID();
+ int srcTaskIndex = srcAttemptId.getTaskID().getId();
if (isDataMovementEvent) {
- edgeManager.routeEventToDestinationTasks((DataMovementEvent) event,
- sourceTaskIndex, destinationVertex.getTotalTasks(),
- destTaskIndices);
+ DataMovementEvent dmEvent = (DataMovementEvent)tezEvent.getEvent();
+ edgeManager.routeDataMovementEventToDestination(dmEvent,
+ srcTaskIndex, destinationVertex.getTotalTasks(),
+ inputIndicesToTaskIndices);
+ } else {
+ edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex,
+ destinationVertex.getTotalTasks(), inputIndicesToTaskIndices);
+ }
+ if (!inputIndicesToTaskIndices.isEmpty()) {
+ sendDmEventOrIfEventToTasks(tezEvent, srcTaskIndex, isDataMovementEvent, inputIndicesToTaskIndices);
} else {
- edgeManager.routeEventToDestinationTasks((InputFailedEvent) event,
- sourceTaskIndex, destinationVertex.getTotalTasks(),
- destTaskIndices);
+ throw new TezUncheckedException("Event must be routed." +
+ " sourceVertex=" + sourceVertex.getVertexId() +
+ " srcIndex = " + srcTaskIndex +
+ " destAttemptId=" + destinationVertex.getVertexId() +
+ " edgeManager=" + edgeManager.getClass().getName() +
+ " Event type=" + tezEvent.getEventType());
}
- tezEvent.setDestinationInfo(destinationMetaInfo);
- for(Integer destTaskIndex : destTaskIndices) {
- Task destTask = destinationVertex.getTask(destTaskIndex);
- if (destTask == null) {
- throw new TezUncheckedException("Unexpected null task." +
- " sourceVertex=" + sourceVertex.getVertexId() +
- " srcIndex = " + sourceTaskIndex +
- " destAttemptId=" + destinationVertex.getVertexId() +
- " destIndex=" + destTaskIndex +
- " edgeManager=" + edgeManager.getClass().getName());
- }
- TezTaskID destTaskId = destTask.getTaskId();
- sendEventToTask(destTaskId, tezEvent);
- }
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
index 7cde07b..02f5154 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
@@ -19,11 +19,11 @@
package org.apache.tez.dag.app.dag.impl;
import java.util.List;
+import java.util.Map;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
public class NullEdgeManager implements EdgeManager {
@@ -36,39 +36,40 @@ public class NullEdgeManager implements EdgeManager {
}
@Override
- public int getNumDestinationTaskInputs(int numSourceTasks, int destinationTaskIndex) {
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, int destinationTaskIndex) {
throw new UnsupportedOperationException(
"Cannot route events. EdgeManager should have been replaced at runtime");
}
@Override
- public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex) {
throw new UnsupportedOperationException(
"Cannot route events. EdgeManager should have been replaced at runtime");
}
@Override
- public void routeEventToDestinationTasks(DataMovementEvent event, int sourceTaskIndex,
- int numDestinationTasks, List<Integer> taskIndices) {
+ public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex,
+ int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
throw new UnsupportedOperationException(
"Cannot route events. EdgeManager should have been replaced at runtime");
}
@Override
- public void routeEventToDestinationTasks(InputFailedEvent event, int sourceTaskIndex,
- int numDestinationTasks, List<Integer> taskIndices) {
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
throw new UnsupportedOperationException(
"Cannot route events. EdgeManager should have been replaced at runtime");
}
@Override
- public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex) {
throw new UnsupportedOperationException(
"Cannot route events. EdgeManager should have been replaced at runtime");
}
@Override
- public int routeEventToSourceTasks(int destinationTaskIndex, InputReadErrorEvent event) {
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
throw new UnsupportedOperationException(
"Cannot route events. EdgeManager should have been replaced at runtime");
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index e026db9..1de25b9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -18,13 +18,14 @@
package org.apache.tez.dag.app.dag.impl;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
public class OneToOneEdgeManager implements EdgeManager {
@@ -34,43 +35,42 @@ public class OneToOneEdgeManager implements EdgeManager {
}
@Override
- public int getNumDestinationTaskInputs(int numDestinationTasks,
+ public int getNumDestinationTaskPhysicalInputs(int numDestinationTasks,
int destinationTaskIndex) {
return 1;
}
@Override
- public int getNumSourceTaskOutputs(int numDestinationTasks,
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
int sourceTaskIndex) {
return 1;
}
@Override
- public void routeEventToDestinationTasks(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
- event.setTargetIndex(0);
- addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+ inputIndicesToTaskIndices.put(new Integer(0),
+ Collections.singletonList(new Integer(sourceTaskIndex)));
}
@Override
- public void routeEventToDestinationTasks(InputFailedEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
- event.setTargetIndex(0);
- addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+ inputIndicesToTaskIndices.put(new Integer(0),
+ Collections.singletonList(new Integer(sourceTaskIndex)));
}
-
+
@Override
- public int routeEventToSourceTasks(int destinationTaskIndex,
- InputReadErrorEvent event) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex) {
return destinationTaskIndex;
}
- void addDestinationTaskIndex(int sourceTaskIndex, List<Integer> taskIndeces) {
- taskIndeces.add(new Integer(sourceTaskIndex));
- }
-
@Override
- public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestTasks) {
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestTasks) {
return 1;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index c825a0a..9f4d61e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -18,13 +18,16 @@
package org.apache.tez.dag.app.dag.impl;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
+
+import com.google.common.collect.Lists;
public class ScatterGatherEdgeManager implements EdgeManager {
@@ -34,39 +37,40 @@ public class ScatterGatherEdgeManager implements EdgeManager {
}
@Override
- public int getNumDestinationTaskInputs(int numSourceTasks,
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
int destinationTaskIndex) {
return numSourceTasks;
}
@Override
- public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex) {
return numDestinationTasks;
}
@Override
- public void routeEventToDestinationTasks(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
- int destinationTaskIndex = event.getSourceIndex();
- event.setTargetIndex(sourceTaskIndex);
- taskIndices.add(new Integer(destinationTaskIndex));
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+ inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex),
+ Collections.singletonList(new Integer(event.getSourceIndex())));
}
@Override
- public void routeEventToDestinationTasks(InputFailedEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
- event.setTargetIndex(sourceTaskIndex);
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+ List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+ inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
}
@Override
- public int routeEventToSourceTasks(int destinationTaskIndex,
- InputReadErrorEvent event) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex) {
return event.getIndex();
}
@Override
- public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestTasks) {
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestTasks) {
return numDestTasks;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
index ff8fdbe..7fbd4d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -19,11 +19,11 @@
package org.apache.tez.test;
import java.util.List;
+import java.util.Map;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
public class EdgeManagerForTest implements EdgeManager {
@@ -58,33 +58,34 @@ public class EdgeManagerForTest implements EdgeManager {
}
@Override
- public int getNumDestinationTaskInputs(int numSourceTasks, int destinationTaskIndex) {
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, int destinationTaskIndex) {
return 0;
}
@Override
- public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex) {
return 0;
}
@Override
- public void routeEventToDestinationTasks(DataMovementEvent event, int sourceTaskIndex,
- int numDestinationTasks, List<Integer> taskIndices) {
+ public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex,
+ int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
}
@Override
- public void routeEventToDestinationTasks(InputFailedEvent event, int sourceTaskIndex,
- int numDestinationTasks, List<Integer> taskIndices) {
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
+ return 0;
}
@Override
- public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex) {
return 0;
}
@Override
- public int routeEventToSourceTasks(int destinationTaskIndex, InputReadErrorEvent event) {
- return 0;
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
}
// End of overridden methods
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 0776cf0..70e9fae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.library.vertexmanager;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -41,11 +42,11 @@ import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -144,7 +145,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
}
@Override
- public int getNumDestinationTaskInputs(int numSourceTasks,
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
int destinationTaskIndex) {
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
@@ -156,14 +157,14 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
}
@Override
- public int getNumSourceTaskOutputs(int numDestinationTasks,
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
int sourceTaskIndex) {
return numSourceTaskOutputs;
}
@Override
- public void routeEventToDestinationTasks(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
int sourceIndex = event.getSourceIndex();
int destinationTaskIndex = sourceIndex/basePartitionRange;
int partitionRange = 1;
@@ -178,32 +179,46 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
sourceTaskIndex * partitionRange
+ sourceIndex % partitionRange;
- event.setTargetIndex(targetIndex);
- taskIndices.add(new Integer(destinationTaskIndex));
+ inputIndicesToTaskIndices.put(new Integer(targetIndex),
+ Collections.singletonList(new Integer(destinationTaskIndex)));
}
-
+
@Override
- public void routeEventToDestinationTasks(InputFailedEvent event,
- int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
- int sourceIndex = event.getSourceIndex();
- int destinationTaskIndex = sourceIndex/basePartitionRange;
- int partitionRange = 1;
- if(destinationTaskIndex < numDestinationTasks-1) {
- partitionRange = basePartitionRange;
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ int numDestinationTasks,
+ Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+ if (remainderRangeForLastShuffler < basePartitionRange) {
+ List<Integer> lastTask = Collections.singletonList(
+ new Integer(numDestinationTasks-1));
+ List<Integer> otherTasks = Lists.newArrayListWithCapacity(numDestinationTasks-1);
+ for (int i=0; i<numDestinationTasks-1; ++i) {
+ otherTasks.add(new Integer(i));
+ }
+
+ int startOffset = sourceTaskIndex * basePartitionRange;
+ for (int i=0; i<basePartitionRange; ++i) {
+ inputIndicesToTaskIndices.put(new Integer(startOffset+i), otherTasks);
+ }
+ startOffset = sourceTaskIndex * remainderRangeForLastShuffler;
+ for (int i=0; i<remainderRangeForLastShuffler; ++i) {
+ inputIndicesToTaskIndices.put(new Integer(startOffset+i), lastTask);
+ }
} else {
- partitionRange = remainderRangeForLastShuffler;
+ // all tasks have same pattern
+ List<Integer> allTasks = Lists.newArrayListWithCapacity(numDestinationTasks);
+ for (int i=0; i<numDestinationTasks; ++i) {
+ allTasks.add(new Integer(i));
+ }
+ int startOffset = sourceTaskIndex * basePartitionRange;
+ for (int i=0; i<basePartitionRange; ++i) {
+ inputIndicesToTaskIndices.put(new Integer(startOffset+i), allTasks);
+ }
}
- int targetIndex =
- sourceTaskIndex * partitionRange
- + sourceIndex % partitionRange;
-
- event.setTargetIndex(targetIndex);
- taskIndices.add(new Integer(destinationTaskIndex));
}
@Override
- public int routeEventToSourceTasks(int destinationTaskIndex,
- InputReadErrorEvent event) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex) {
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
partitionRange = basePartitionRange;
@@ -214,7 +229,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
}
@Override
- public int getDestinationConsumerTaskNumber(int sourceTaskIndex,
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex,
int numDestTasks) {
return numDestTasks;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 1a26487..fd11378 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -42,7 +42,7 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import static org.mockito.Mockito.*;
@@ -198,16 +198,31 @@ public class TestShuffleVertexManager {
Assert.assertEquals(2, newEdgeManagers.size());
EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
- List<Integer> targets = Lists.newArrayList();
+ Map<Integer, List<Integer>> targets = Maps.newHashMap();
DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
- edgeManager.routeEventToDestinationTasks(dmEvent, 1, 2, targets);
- Assert.assertEquals(3, dmEvent.getTargetIndex());
- Assert.assertEquals(0, targets.get(0).intValue());
+ edgeManager.routeDataMovementEventToDestination(dmEvent, 1, 2, targets);
+ Assert.assertEquals(1, targets.size());
+ Map.Entry<Integer, List<Integer>> e = targets.entrySet().iterator().next();
+ Assert.assertEquals(3, e.getKey().intValue());
+ Assert.assertEquals(1, e.getValue().size());
+ Assert.assertEquals(0, e.getValue().get(0).intValue());
targets.clear();
dmEvent = new DataMovementEvent(2, new byte[0]);
- edgeManager.routeEventToDestinationTasks(dmEvent, 0, 2, targets);
- Assert.assertEquals(0, dmEvent.getTargetIndex());
- Assert.assertEquals(1, targets.get(0).intValue());
+ edgeManager.routeDataMovementEventToDestination(dmEvent, 0, 2, targets);
+ Assert.assertEquals(1, targets.size());
+ e = targets.entrySet().iterator().next();
+ Assert.assertEquals(0, e.getKey().intValue());
+ Assert.assertEquals(1, e.getValue().size());
+ Assert.assertEquals(1, e.getValue().get(0).intValue());
+ targets.clear();
+ edgeManager.routeInputSourceTaskFailedEventToDestination(2, 2, targets);
+ Assert.assertEquals(2, targets.size());
+ for (Map.Entry<Integer, List<Integer>> entry : targets.entrySet()) {
+ Assert.assertTrue(entry.getKey().intValue() == 4 || entry.getKey().intValue() == 5);
+ Assert.assertEquals(2, entry.getValue().size());
+ Assert.assertEquals(0, entry.getValue().get(0).intValue());
+ Assert.assertEquals(1, entry.getValue().get(1).intValue());
+ }
}
@SuppressWarnings({ "unchecked", "rawtypes" })