You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:52 UTC
[14/43] tez git commit: TEZ-776. Reduce AM mem usage caused by
storing TezEvents (bikas)
TEZ-776. Reduce AM mem usage caused by storing TezEvents (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/05f77fe2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/05f77fe2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/05f77fe2
Branch: refs/heads/TEZ-2003
Commit: 05f77fe2b210341a16ead9fc51e53093c836d860
Parents: a382324
Author: Bikas Saha <bi...@apache.org>
Authored: Thu May 7 15:44:38 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu May 7 15:44:38 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
tez-api/findbugs-exclude.xml | 12 +
.../apache/tez/dag/api/EdgeManagerPlugin.java | 4 +-
.../tez/dag/api/EdgeManagerPluginOnDemand.java | 332 +++++++++++++++++++
.../api/events/CompositeDataMovementEvent.java | 21 ++
.../runtime/api/events/DataMovementEvent.java | 33 +-
.../runtime/api/events/InputFailedEvent.java | 16 +
tez-dag/findbugs-exclude.xml | 5 +-
.../tez/dag/app/TaskAttemptEventInfo.java | 40 +++
.../dag/app/TaskAttemptListenerImpTezDag.java | 6 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 5 +
.../dag/app/dag/impl/BroadcastEdgeManager.java | 41 ++-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 244 ++++++++++++--
.../dag/app/dag/impl/OneToOneEdgeManager.java | 67 +++-
.../app/dag/impl/ScatterGatherEdgeManager.java | 83 ++++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 6 -
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 323 ++++++++++++++----
.../apache/tez/dag/app/MockDAGAppMaster.java | 25 +-
.../tez/dag/app/TestMemoryWithEvents.java | 219 ++++++++++++
.../tez/dag/app/TestMockDAGAppMaster.java | 160 ++++++++-
.../app/TestTaskAttemptListenerImplTezDag.java | 33 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 90 ++++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 208 +++++++++++-
.../org/apache/tez/test/EdgeManagerForTest.java | 33 +-
.../org/apache/tez/runtime/RuntimeTask.java | 10 +
.../runtime/api/impl/TezHeartbeatResponse.java | 12 +
.../apache/tez/runtime/task/TaskReporter.java | 12 +-
.../vertexmanager/ShuffleVertexManager.java | 117 ++++++-
.../tez/test/TestExceptionPropagation.java | 39 ++-
29 files changed, 2000 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8de61b0..ba8e9d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
Default max limit increased. Should not affect existing users.
ALL CHANGES:
+ TEZ-776. Reduce AM mem usage caused by storing TezEvents
TEZ-2423. Tez UI: Remove Attempt Index column from task->attempts page
TEZ-2416. Tez UI: Make tooltips display faster.
TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml
index b928a44..07792e6 100644
--- a/tez-api/findbugs-exclude.xml
+++ b/tez-api/findbugs-exclude.xml
@@ -85,4 +85,16 @@
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD"/>
</Match>
+ <Match>
+ <Class name="org.apache.tez.dag.api.EdgeManagerPluginOnDemand$EventRouteMetadata"/>
+ <Method name="getSourceIndices"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.dag.api.EdgeManagerPluginOnDemand$EventRouteMetadata"/>
+ <Method name="getTargetIndices"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
index 8768e7d..4e22f63 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
@@ -101,7 +101,7 @@ public abstract class EdgeManagerPlugin {
public abstract void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception;
-
+
/**
* Return the routing information to inform consumers about the failure of a
* source task whose outputs have been potentially lost. The return map has
@@ -144,7 +144,7 @@ public abstract class EdgeManagerPlugin {
int destinationTaskIndex, int destinationFailedInputIndex) throws Exception;
/**
- * Return ahe {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
+ * Return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
* the vertex manager.
*
* @return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for the input
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
new file mode 100644
index 0000000..05c0c62
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
@@ -0,0 +1,332 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+/**
+ * This interface defines the routing of the event between tasks of producer and
+ * consumer vertices. The routing is bi-directional. Users can customize the
+ * routing by providing an implementation of this interface.
+ */
+@Public
+@Unstable
+public abstract class EdgeManagerPluginOnDemand extends EdgeManagerPlugin {
+
+ /**
+ * Class to provide routing metadata for {@link Event}s to be routed between
+ * producer and consumer tasks. The routing data enabled the system to send
+ * the event from the producer task output to the consumer task input
+ */
+ public static class EventRouteMetadata {
+ private final int numEvents;
+ private final int[] targetIndices;
+ private final int[] sourceIndices;
+
+ /**
+ * Create an {@link EventRouteMetadata} that will create numEvents copies of
+ * the {@link Event} to be routed. Use this to create
+ * {@link EventRouteMetadata} for {@link DataMovementEvent}s or
+ * {@link InputFailedEvent}s where the target input indices must be
+ * specified to route those events. Typically numEvents would be 1 for these
+ * events.
+ *
+ * @param numEvents
+ * Number of copies of the event to be routed
+ * @param targetIndices
+ * Target input indices. The array length must match the number of
+ * events specified when creating the {@link EventRouteMetadata}
+ * object
+ * @return {@link EventRouteMetadata}
+ */
+ public static EventRouteMetadata create(int numEvents, int[] targetIndices) {
+ return new EventRouteMetadata(numEvents, targetIndices, null);
+ }
+
+ /**
+ * Create an {@link EventRouteMetadata} that will create numEvents copies of
+ * the {@link Event} to be routed. Use this to create
+ * {@link EventRouteMetadata} for {@link CompositeDataMovementEvent} where
+ * the target input indices and source output indices must be specified to
+ * route those events. Typically numEvents would be 1 for these events.
+ *
+ * @param numEvents
+ * Number of copies of the event to be routed
+ * @param targetIndices
+ * Target input indices. The array length must match the number of
+ * events specified when creating the {@link EventRouteMetadata}
+ * object
+ * @param sourceIndices
+ * Source output indices. The array length must match the number of
+ * events specified when creating the {@link EventRouteMetadata}
+ * object
+ * @return {@link EventRouteMetadata}
+ */
+ public static EventRouteMetadata create(int numEvents, int[] targetIndices, int[] sourceIndices) {
+ return new EventRouteMetadata(numEvents, targetIndices, sourceIndices);
+ }
+
+ private EventRouteMetadata(int numEvents, int[] targetIndices, int[] sourceIndices) {
+ this.numEvents = numEvents;
+ this.targetIndices = targetIndices;
+ this.sourceIndices = sourceIndices;
+ }
+
+ /**
+ * Get the number of copies of the event to be routed
+ * @return Number of copies
+ */
+ public int getNumEvents() {
+ return numEvents;
+ }
+
+ /**
+ * Get the target input indices
+ * @return Target input indices
+ */
+ public @Nullable int[] getTargetIndices() {
+ return targetIndices;
+ }
+
+ /**
+ * Get the source output indices
+ * @return Source output indices
+ */
+ public @Nullable int[] getSourceIndices() {
+ return sourceIndices;
+ }
+ }
+
+ /**
+ * Create an instance of the {@link EdgeManagerPluginOnDemand}. Classes
+ * extending this to create a {@link EdgeManagerPluginOnDemand}, must provide
+ * the same constructor so that Tez can create an instance of the class at
+ * runtime.
+ *
+ * @param context
+ * the context within which this {@link EdgeManagerPluginOnDemand}
+ * will run. Includes information like configuration which the user
+ * may have specified while setting up the edge.
+ */
+ public EdgeManagerPluginOnDemand(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ /**
+ * Initializes the EdgeManagerPlugin. This method is called in the following
+ * circumstances </p> 1. when initializing an EdgeManagerPlugin for the first time.
+ * </p> 2. When an EdgeManagerPlugin is replaced at runtime. At this point, an
+ * EdgeManagerPlugin instance is created and setup by the user. The initialize
+ * method will be called with the original {@link EdgeManagerPluginContext} when the
+ * EdgeManagerPlugin is replaced.
+ * @throws Exception
+ */
+ public abstract void initialize() throws Exception;
+
+ /**
+ * This method will be invoked just before routing of events will begin. The
+ * plugin can use this opportunity to make any runtime initialization's that
+ * depend on the actual state of the DAG or vertices.
+ */
+ public abstract void prepareForRouting() throws Exception;
+
+ /**
+ * Get the number of physical inputs on the destination task
+ * @param destinationTaskIndex Index of destination task for which number of
+ * inputs is needed
+ * @return Number of physical inputs on the destination task
+ * @throws Exception
+ */
+ public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception;
+
+ /**
+ * Get the number of physical outputs on the source task
+ * @param sourceTaskIndex Index of the source task for which number of outputs
+ * is needed
+ * @return Number of physical outputs on the source task
+ * @throws Exception
+ */
+ public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception;
+
+ /**
+ * Get the number of destination tasks that consume data from the source task
+ * @param sourceTaskIndex Source task index
+ * @throws Exception
+ */
+ public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception;
+
+ /**
+ * Return the source task index to which to send the input error event
+ *
+ * @param destinationTaskIndex
+ * Destination task that reported the error
+ * @param destinationFailedInputIndex
+ * Index of the physical input on the destination task that reported
+ * the error
+ * @return Index of the source task that created the unavailable input
+ * @throws Exception
+ */
+ public abstract int routeInputErrorEventToSource(int destinationTaskIndex,
+ int destinationFailedInputIndex) throws Exception;
+
+ /**
+ * The method provides the {@link EventRouteMetadata} to route a
+ * {@link DataMovementEvent} produced by the given source task to the given
+ * destination task. The returned {@link EventRouteMetadata} should have the
+ * target input indices set to enable the routing. If the routing metadata is
+ * common across different events then the plugin can cache and reuse the same
+ * object.
+ *
+ * @param sourceTaskIndex
+ * The index of the task in the source vertex of this edge that
+ * produced a {@link DataMovementEvent}
+ * @param sourceOutputIndex
+ * Index of the physical output on the source task that produced the
+ * event
+ * @param destinationTaskIndex
+ * @return {@link EventRouteMetadata} with target indices set. Maybe null if
+ * the given destination task does not read input from the given
+ * source task.
+ * @throws Exception
+ */
+ public abstract @Nullable EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex,
+ int sourceOutputIndex, int destinationTaskIndex) throws Exception;
+
+ /**
+ * The method provides the {@link EventRouteMetadata} to route a
+ * {@link CompositeDataMovementEvent} produced by the given source task to the
+ * given destination task. The returned {@link EventRouteMetadata} should have
+ * the target input indices and source output indices set to enable the
+ * routing. If the routing metadata is common across different events then the
+ * plugin can cache and reuse the same object.
+ *
+ * @param sourceTaskIndex
+ * The index of the task in the source vertex of this edge that
+ * produced a {@link CompositeDataMovementEvent}
+ * @param destinationTaskIndex
+ * The index of the task in the destination vertex of this edge
+ * @return {@link EventRouteMetadata} with source and target indices set. This
+ * may be null if the destination task does not read data from the
+ * source task.
+ * @throws Exception
+ */
+ public abstract @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception;
+
+ /**
+ * The method provides the {@link EventRouteMetadata} to route an
+ * {@link InputFailedEvent} produced by the given source task to the given
+ * destination task. The returned {@link EventRouteMetadata} should have the
+ * target input indices set to enable the routing. If the routing metadata is
+ * common across different events then the plugin can cache and reuse the same
+ * object.
+ *
+ * @param sourceTaskIndex
+ * The index of the failed task in the source vertex of this edge.
+ * @param destinationTaskIndex
+ * The index of a task in the destination vertex of this edge.
+ * @return {@link EventRouteMetadata} with target indices set. Maybe null if
+ * the given destination task does not read input from the given
+ * source task.
+ * @throws Exception
+ */
+ public abstract @Nullable EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception;
+
+ /**
+ * Return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
+ * the vertex manager.
+ *
+ * @return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for the input
+ */
+ public EdgeManagerPluginContext getContext() {
+ return super.getContext();
+ }
+
+ // Empty implementations of EdgeManagerPlugin interfaces that are not needed
+ /**
+ * Return the routing information to inform consumers about the source task
+ * output that is now available. The return map has the routing information.
+ * The event will be routed to every destination task index in the key of the
+ * map. Every physical input in the value for that task key will receive the
+ * input.
+ *
+ * @param event
+ * Data movement event that contains the output information
+ * @param sourceTaskIndex
+ * Source task that produced the event
+ * @param sourceOutputIndex
+ * Index of the physical output on the source task that produced the
+ * event
+ * @param destinationTaskAndInputIndices
+ * Map via which the routing information is returned
+ * @throws Exception
+ */
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int sourceOutputIndex,
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {}
+
+ /**
+ * Return the routing information to inform consumers about the failure of a
+ * source task whose outputs have been potentially lost. The return map has
+ * the routing information. The failure notification event will be sent to
+ * every task index in the key of the map. Every physical input in the value
+ * for that task key will receive the failure notification. This method will
+ * be called once for every source task failure and information for all
+ * affected destinations must be provided in that invocation.
+ *
+ * @param sourceTaskIndex
+ * Source task
+ * @param destinationTaskAndInputIndices
+ * Map via which the routing information is returned
+ * @throws Exception
+ */
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {}
+
+ /**
+ * Return the source task index to which to send the input error event
+ *
+ * @param event
+ * Input read error event. Has more information about the error
+ * @param destinationTaskIndex
+ * Destination task that reported the error
+ * @param destinationFailedInputIndex
+ * Index of the physical input on the destination task that reported
+ * the error
+ * @return Index of the source task that created the unavailable input
+ * @throws Exception
+ */
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
+ return routeInputErrorEventToSource(destinationTaskIndex, destinationFailedInputIndex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
index b38fda3..c45d272 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.api.events;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.tez.runtime.api.Event;
@@ -65,6 +66,26 @@ public class CompositeDataMovementEvent extends Event {
ByteBuffer userPayload) {
return new CompositeDataMovementEvent(srcIndexStart, count, userPayload);
}
+
+ /**
+ * Expand the {@link CompositeDataMovementEvent} into a routable
+ * {@link DataMovementEvent} by providing the source output index and the
+ * target input index.
+ *
+ * @param sourceIndex
+ * The index of the physical output represented by the
+ * {@link DataMovementEvent}
+ * @param targetIndex
+ * The index of the physical input to which the given
+ * {@link DataMovementEvent} should be routed.
+ * @return {@link DataMovementEvent} created from the
+ * {@link CompositeDataMovementEvent} with indices specified by the
+ * method parameters
+ */
+ @Private
+ public DataMovementEvent expand(int sourceIndex, int targetIndex) {
+ return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload);
+ }
public int getSourceIndexStart() {
return sourceIndexStart;
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
index b9c1cc4..05c3d3f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Output;
/**
* Event used by user code to send information between tasks. An output can
@@ -56,14 +57,8 @@ public final class DataMovementEvent extends Event {
private int version;
- private DataMovementEvent(int sourceIndex,
- ByteBuffer userPayload) {
- this.userPayload = userPayload;
- this.sourceIndex = sourceIndex;
- }
-
@Private
- private DataMovementEvent(int sourceIndex,
+ DataMovementEvent(int sourceIndex,
int targetIndex,
int version,
ByteBuffer userPayload) {
@@ -74,20 +69,21 @@ public final class DataMovementEvent extends Event {
}
private DataMovementEvent(ByteBuffer userPayload) {
- this(-1, userPayload);
+ this(-1, -1, -1, userPayload);
}
/**
- * User Event constructor
+ * User Event constructor for {@link Output}s
* @param sourceIndex Index to identify the physical edge of the input/output
* that generated the event
* @param userPayload User Payload of the User Event
*/
public static DataMovementEvent create(int sourceIndex,
ByteBuffer userPayload) {
- return new DataMovementEvent(sourceIndex, userPayload);
+ return new DataMovementEvent(sourceIndex, -1, -1, userPayload);
}
-
+
+ @Private
/**
* Constructor for Processor-generated User Events
* @param userPayload
@@ -103,6 +99,21 @@ public final class DataMovementEvent extends Event {
ByteBuffer userPayload) {
return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload);
}
+
+ /**
+ * Make a routable copy of the {@link DataMovementEvent} by adding a target
+ * input index
+ *
+ * @param targetIndex
+ * The index of the physical input to which this
+ * {@link DataMovementEvent} should be routed
+ * @return Copy of this {@link DataMovementEvent} with the target input index
+ * added to it
+ */
+ @Private
+ public DataMovementEvent makeCopy(int targetIndex) {
+ return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload);
+ }
public ByteBuffer getUserPayload() {
return userPayload == null ? null : userPayload.asReadOnlyBuffer();
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
index 639d0b9..9d8363a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
@@ -55,6 +55,22 @@ public class InputFailedEvent extends Event{
public static InputFailedEvent create(int targetIndex, int version) {
return new InputFailedEvent(targetIndex, version);
}
+
+ /**
+ * Create a copy of the {@link InputFailedEvent} by adding a target input
+ * index The index of the physical input to which this event should be routed
+ *
+ * @param targetIndex
+ * The index of the physical input to which this
+ * {@link InputFailedEvent} should be routed
+ *
+ * @return copy of the {@link InputFailedEvent} with the target input index
+ * added
+ */
+ @Private
+ public InputFailedEvent makeCopy(int targetIndex) {
+ return create(targetIndex, version);
+ }
public int getTargetIndex() {
return targetIndex;
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 233f76c..57c0aca 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -118,7 +118,10 @@
<!-- TEZ-1952 -->
<Match>
<Class name="org.apache.tez.dag.app.dag.impl.Edge"/>
- <Field name="edgeProperty"/>
+ <Or>
+ <Field name="edgeProperty"/>
+ <Field name="onDemandRouting"/>
+ </Or>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
new file mode 100644
index 0000000..49ff044
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.util.List;
+
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class TaskAttemptEventInfo {
+ private final int nextFromEventId;
+ private final List<TezEvent> events;
+
+ public TaskAttemptEventInfo(int nextFromEventId, List<TezEvent> events) {
+ this.nextFromEventId = nextFromEventId;
+ this.events = events;
+ }
+
+ public int getNextFromEventId() {
+ return nextFromEventId;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index b38081b..970489d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -444,13 +444,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
- List<TezEvent> outEvents = context
+ TaskAttemptEventInfo eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
- .getTask(taskAttemptID.getTaskID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
request.getMaxEvents());
- response.setEvents(outEvents);
+ response.setEvents(eventInfo.getEvents());
+ response.setNextFromEventId(eventInfo.getNextFromEventId());
}
containerInfo.lastRequestId = requestId;
containerInfo.lastReponse = response;
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 77ef6e0..bb42392 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -45,10 +45,12 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
@@ -144,6 +146,9 @@ public interface Vertex extends Comparable<Vertex> {
void scheduleSpeculativeTask(TezTaskID taskId);
Resource getTaskResource();
+ public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+ int fromEventId, int maxEvents);
+
void handleSpeculatorEvent(SpeculatorEvent event);
ProcessorDescriptor getProcessorDescriptor();
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index db57227..d14527d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -22,12 +22,14 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-public class BroadcastEdgeManager extends EdgeManagerPlugin {
+public class BroadcastEdgeManager extends EdgeManagerPluginOnDemand {
+
+ EventRouteMetadata[] commonRouteMeta;
public BroadcastEdgeManager(EdgeManagerPluginContext context) {
super(context);
@@ -60,6 +62,35 @@ public class BroadcastEdgeManager extends EdgeManagerPlugin {
}
@Override
+ public void prepareForRouting() throws Exception {
+ int numSourceTasks = getContext().getSourceVertexNumTasks();
+ commonRouteMeta = new EventRouteMetadata[numSourceTasks];
+ for (int i=0; i<numSourceTasks; ++i) {
+ commonRouteMeta[i] = EventRouteMetadata.create(1, new int[]{i}, new int[]{0});
+ }
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(
+ int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
+ throws Exception {
+ return commonRouteMeta[sourceTaskIndex];
+ }
+
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ return commonRouteMeta[sourceTaskIndex];
+ }
+
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+ return commonRouteMeta[sourceTaskIndex];
+ }
+
+ @Override
public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
List<Integer> inputIndices =
@@ -71,6 +102,12 @@ public class BroadcastEdgeManager extends EdgeManagerPlugin {
}
@Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex)
+ throws Exception {
+ return destinationFailedInputIndex;
+ }
+
+ @Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
int destinationTaskIndex, int destinationFailedInputIndex) {
return destinationFailedInputIndex;
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index f5fef67..78bab05 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
@@ -30,9 +31,11 @@ import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
@@ -97,7 +100,9 @@ public class Edge {
private EdgeProperty edgeProperty;
private EdgeManagerPluginContext edgeManagerContext;
- private EdgeManagerPlugin edgeManager;
+ @VisibleForTesting
+ EdgeManagerPlugin edgeManager;
+ private boolean onDemandRouting = false;
@SuppressWarnings("rawtypes")
private EventHandler eventHandler;
private AtomicBoolean bufferEvents = new AtomicBoolean(false);
@@ -106,6 +111,9 @@ public class Edge {
private Vertex sourceVertex;
private Vertex destinationVertex; // this may end up being a list for shared edge
private EventMetaData destinationMetaInfo;
+ private boolean routingNeeded = true;
+ private final ConcurrentMap<TezTaskAttemptID, PendingEventRouteMetadata> pendingEvents = Maps
+ .newConcurrentMap();
@SuppressWarnings("rawtypes")
public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
@@ -188,6 +196,29 @@ public class Edge {
edgeProperty.getEdgeDestination());
setEdgeProperty(modifiedEdgeProperty);
}
+
+ public synchronized boolean routingToBegin() throws AMUserCodeException {
+ if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
+ routingNeeded = false;
+ } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
+ throw new TezUncheckedException(
+ "Internal error. Not expected to route events to a destination until parallelism is determined" +
+ " sourceVertex=" + sourceVertex.getLogIdentifier() +
+ " edgeManager=" + edgeManager.getClass().getName());
+ }
+ if (edgeManager instanceof EdgeManagerPluginOnDemand) {
+ onDemandRouting = true;
+ try {
+ ((EdgeManagerPluginOnDemand)edgeManager).prepareForRouting();
+ } catch (Exception e) {
+ throw new AMUserCodeException(Source.EdgeManager,
+ "Fail to prepareForRouting " + getEdgeInfo(), e);
+ }
+ }
+
+ LOG.info("Routing to begin for edge: " + getEdgeInfo() + ". EdgeProperty: " + edgeProperty);
+ return onDemandRouting;
+ }
public synchronized EdgeProperty getEdgeProperty() {
return this.edgeProperty;
@@ -280,8 +311,13 @@ public class Edge {
int srcTaskIndex;
int numConsumers;
try {
- srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
- destTaskIndex, event.getIndex());
+ if (onDemandRouting) {
+ srcTaskIndex = ((EdgeManagerPluginOnDemand) edgeManager).routeInputErrorEventToSource(
+ destTaskIndex, event.getIndex());
+ } else {
+ srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
+ destTaskIndex, event.getIndex());
+ }
Preconditions.checkArgument(srcTaskIndex >= 0,
"SourceTaskIndex should not be negative,"
+ "srcTaskIndex=" + srcTaskIndex);
@@ -340,7 +376,6 @@ public class Edge {
Preconditions.checkState(edgeManager != null,
"Edge Manager must be initialized by this time");
Event event = tezEvent.getEvent();
- boolean isFirstEvent = true;
// cache of event object per input index
Map<Integer, TezEvent> inputIndicesWithEvents = Maps.newHashMap();
for (Map.Entry<Integer, List<Integer>> entry : taskAndInputIndices.entrySet()) {
@@ -350,28 +385,16 @@ public class Edge {
Integer inputIndex = inputIndices.get(i);
TezEvent tezEventToSend = inputIndicesWithEvents.get(inputIndex);
if (tezEventToSend == null) {
- if (isFirstEvent) {
- isFirstEvent = false;
- // this is the first item - reuse the event object
- if (isDataMovementEvent) {
- ((DataMovementEvent) event).setTargetIndex(inputIndex);
- } else {
- ((InputFailedEvent) event).setTargetIndex(inputIndex);
- }
- tezEventToSend = tezEvent;
+ Event e;
+ if (isDataMovementEvent) {
+ DataMovementEvent dmEvent = (DataMovementEvent) event;
+ e = DataMovementEvent.create(dmEvent.getSourceIndex(),
+ inputIndex, dmEvent.getVersion(), dmEvent.getUserPayload());
} else {
- // create new event object for this input index
- Event e;
- if (isDataMovementEvent) {
- DataMovementEvent dmEvent = (DataMovementEvent) event;
- e = DataMovementEvent.create(dmEvent.getSourceIndex(),
- inputIndex, dmEvent.getVersion(), dmEvent.getUserPayload());
- } else {
- InputFailedEvent ifEvent = ((InputFailedEvent) event);
- e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
- }
- tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ InputFailedEvent ifEvent = ((InputFailedEvent) event);
+ e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
}
+ tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
tezEventToSend.setDestinationInfo(destinationMetaInfo);
// cache the event object per input because are unique per input index
inputIndicesWithEvents.put(inputIndex, tezEventToSend);
@@ -392,8 +415,6 @@ public class Edge {
}
public void sendTezEventToDestinationTasks(TezEvent tezEvent) throws AMUserCodeException {
- Preconditions.checkState(edgeManager != null,
- "Edge Manager must be initialized by this time");
if (!bufferEvents.get()) {
boolean isDataMovementEvent = true;
switch (tezEvent.getEventType()) {
@@ -411,16 +432,7 @@ public class Edge {
.getTaskAttemptID();
int srcTaskIndex = srcAttemptId.getTaskID().getId();
- boolean routingRequired = true;
- if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
- routingRequired = false;
- LOG.info("Not routing events since destination vertex has 0 tasks" +
- generateCommonDebugString(srcTaskIndex, tezEvent));
- } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
- throw new TezUncheckedException(
- "Internal error. Not expected to route events to a destination until parallelism is determined" +
- generateCommonDebugString(srcTaskIndex, tezEvent));
- }
+ boolean routingRequired = routingNeeded;
if (routingRequired) {
try {
@@ -439,6 +451,9 @@ public class Edge {
+ ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:"
+ tezEvent.getDestinationInfo() + ", " + getEdgeInfo(), e);
}
+ } else {
+ LOG.info("Not routing events since destination vertex has 0 tasks" +
+ generateCommonDebugString(srcTaskIndex, tezEvent));
}
if (!destTaskAndInputIndices.isEmpty()) {
@@ -459,6 +474,163 @@ public class Edge {
}
}
+ static class PendingEventRouteMetadata {
+ private final EventRouteMetadata routeMeta;
+ private final TezEvent event;
+ private int numEventsRouted;
+
+ public PendingEventRouteMetadata(EventRouteMetadata routeMeta, TezEvent event,
+ int numEventsRouted) {
+ this.routeMeta = routeMeta;
+ this.event = event;
+ this.numEventsRouted = numEventsRouted;
+ }
+
+ public EventRouteMetadata getRouteMeta() {
+ return routeMeta;
+ }
+
+ public TezEvent getTezEvent() {
+ return event;
+ }
+
+ public int getNumEventsRouted() {
+ return numEventsRouted;
+ }
+ }
+
+ public PendingEventRouteMetadata removePendingEvents(TezTaskAttemptID attemptID) {
+ return pendingEvents.remove(attemptID);
+ }
+
+ // return false is event could be routed but ran out of space in the list
+ public boolean maybeAddTezEventForDestinationTask(TezEvent tezEvent, TezTaskAttemptID attemptID,
+ int srcTaskIndex, List<TezEvent> listToAdd, int listMaxSize,
+ PendingEventRouteMetadata pendingRoutes)
+ throws AMUserCodeException {
+ if (!routingNeeded) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not routing events since destination vertex has 0 tasks" +
+ generateCommonDebugString(srcTaskIndex, tezEvent));
+ }
+ return true;
+ } else {
+ try {
+ EdgeManagerPluginOnDemand edgeManagerOnDemand = (EdgeManagerPluginOnDemand) edgeManager;
+ int taskIndex = attemptID.getTaskID().getId();
+ switch (tezEvent.getEventType()) {
+ case COMPOSITE_DATA_MOVEMENT_EVENT:
+ {
+ CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent();
+ EventRouteMetadata routeMeta;
+ int numEventsDone;
+ if (pendingRoutes != null) {
+ routeMeta = pendingRoutes.getRouteMeta();
+ numEventsDone = pendingRoutes.getNumEventsRouted();
+ } else {
+ routeMeta = edgeManagerOnDemand
+ .routeCompositeDataMovementEventToDestination(srcTaskIndex, taskIndex);
+ numEventsDone = 0;
+ }
+ if (routeMeta != null) {
+ int listSize = listToAdd.size();
+ int numEvents = routeMeta.getNumEvents();
+ int[] sourceIndices = routeMeta.getSourceIndices();
+ int[] targetIndices = routeMeta.getTargetIndices();
+ while (numEventsDone < numEvents && listSize++ < listMaxSize) {
+ DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone],
+ targetIndices[numEventsDone]);
+ numEventsDone++;
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ tezEventToSend.setDestinationInfo(destinationMetaInfo);
+ listToAdd.add(tezEventToSend);
+ }
+ if (numEventsDone < numEvents) {
+ pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
+ numEventsDone));
+ return false;
+ }
+ }
+ }
+ break;
+ case INPUT_FAILED_EVENT:
+ {
+ InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
+ EventRouteMetadata routeMeta;
+ int numEventsDone;
+ if (pendingRoutes != null) {
+ routeMeta = pendingRoutes.getRouteMeta();
+ numEventsDone = pendingRoutes.getNumEventsRouted();
+ } else {
+ routeMeta = edgeManagerOnDemand.routeInputSourceTaskFailedEventToDestination(
+ srcTaskIndex, taskIndex);
+ numEventsDone = 0;
+ }
+ if (routeMeta != null) {
+ int listSize = listToAdd.size();
+ int numEvents = routeMeta.getNumEvents();
+ int[] targetIndices = routeMeta.getTargetIndices();
+ while (numEventsDone < numEvents && listSize++ < listMaxSize) {
+ InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
+ numEventsDone++;
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ tezEventToSend.setDestinationInfo(destinationMetaInfo);
+ listToAdd.add(tezEventToSend);
+ }
+ if (numEventsDone < numEvents) {
+ pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
+ numEventsDone));
+ return false;
+ }
+ }
+ }
+ break;
+ case DATA_MOVEMENT_EVENT:
+ {
+ DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
+ EventRouteMetadata routeMeta;
+ int numEventsDone;
+ if (pendingRoutes != null) {
+ routeMeta = pendingRoutes.getRouteMeta();
+ numEventsDone = pendingRoutes.getNumEventsRouted();
+ } else {
+ routeMeta = edgeManagerOnDemand.routeDataMovementEventToDestination(srcTaskIndex,
+ dmEvent.getSourceIndex(), taskIndex);
+ numEventsDone = 0;
+ }
+ if (routeMeta != null) {
+ int listSize = listToAdd.size();
+ int numEvents = routeMeta.getNumEvents();
+ int[] targetIndices = routeMeta.getTargetIndices();
+ while (numEventsDone < numEvents && listSize++ < listMaxSize) {
+ DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
+ numEventsDone++;
+ TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+ tezEventToSend.setDestinationInfo(destinationMetaInfo);
+ listToAdd.add(tezEventToSend);
+ }
+ if (numEventsDone < numEvents) {
+ pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
+ numEventsDone));
+ return false;
+ }
+ }
+ }
+ break;
+ default:
+ throw new TezUncheckedException("Unhandled tez event type: "
+ + tezEvent.getEventType());
+ }
+ } catch (Exception e){
+ throw new AMUserCodeException(Source.EdgeManager,
+ "Fail to maybeAddTezEventForDestinationTask, event:" + tezEvent.getEvent()
+ + ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:"
+ + tezEvent.getDestinationInfo() + ", " + getEdgeInfo(), e);
+ }
+ }
+ return true;
+ }
+
private void sendEventToTask(Task task, TezEvent tezEvent) {
task.registerTezEvent(tezEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 11a6483..6053806 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -21,18 +21,25 @@ package org.apache.tez.dag.app.dag.impl;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import com.google.common.base.Preconditions;
-public class OneToOneEdgeManager extends EdgeManagerPlugin {
+public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
List<Integer> destinationInputIndices =
Collections.unmodifiableList(Collections.singletonList(0));
+ AtomicBoolean stateChecked = new AtomicBoolean(false);
+
+ final EventRouteMetadata commonRouteMeta =
+ EventRouteMetadata.create(1, new int[]{0}, new int[]{0});
public OneToOneEdgeManager(EdgeManagerPluginContext context) {
super(context);
@@ -57,16 +64,42 @@ public class OneToOneEdgeManager extends EdgeManagerPlugin {
public void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
- // by the time routing is initiated all task counts must be determined and stable
- Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
- .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
- + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
- + getContext().getDestinationVertexNumTasks() + " Source: "
- + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
+ checkState();
destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
}
@Override
+ public void prepareForRouting() throws Exception {
+ checkState();
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(
+ int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
+ throws Exception {
+ if (sourceTaskIndex == destinationTaskIndex) {
+ return commonRouteMeta;
+ }
+ return null;
+ }
+
+ @Override
+ public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ if (sourceTaskIndex == destinationTaskIndex) {
+ return commonRouteMeta;
+ }
+ return null;
+ }
+
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+ return commonRouteMeta;
+ }
+
+ @Override
public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
@@ -79,8 +112,26 @@ public class OneToOneEdgeManager extends EdgeManagerPlugin {
}
@Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
+ return destinationTaskIndex;
+ }
+
+ @Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
return 1;
}
+
+ private void checkState() {
+ if (stateChecked.get()) {
+ return;
+ }
+ // by the time routing is initiated all task counts must be determined and stable
+ Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
+ .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
+ + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
+ + getContext().getDestinationVertexNumTasks() + " Source: "
+ + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
+ stateChecked.set(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index e2608cd..3b66b8f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -18,18 +18,29 @@
package org.apache.tez.dag.app.dag.impl;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
-public class ScatterGatherEdgeManager extends EdgeManagerPlugin {
+public class ScatterGatherEdgeManager extends EdgeManagerPluginOnDemand {
+
+ private AtomicReference<ArrayList<EventRouteMetadata>> commonRouteMeta =
+ new AtomicReference<ArrayList<EventRouteMetadata>>();
+ private Object commonRouteMetaLock = new Object();
+ private int[][] sourceIndices;
+ private int[][] targetIndices;
public ScatterGatherEdgeManager(EdgeManagerPluginContext context) {
super(context);
@@ -53,6 +64,69 @@ public class ScatterGatherEdgeManager extends EdgeManagerPlugin {
return physicalOutputs;
}
+ private ArrayList<EventRouteMetadata> getOrCreateCommonRouteMeta() {
+ ArrayList<EventRouteMetadata> metaData = commonRouteMeta.get();
+ if (metaData == null) {
+ synchronized (commonRouteMetaLock) {
+ metaData = commonRouteMeta.get();
+ if (metaData == null) {
+ int numSourceTasks = getContext().getSourceVertexNumTasks();
+ ArrayList<EventRouteMetadata> localEventMeta = Lists
+ .newArrayListWithCapacity(numSourceTasks);
+ for (int i=0; i<numSourceTasks; ++i) {
+ localEventMeta.add(EventRouteMetadata.create(1, new int[]{i}, new int[]{0}));
+ }
+ Preconditions.checkState(commonRouteMeta.compareAndSet(null, localEventMeta));
+ metaData = commonRouteMeta.get();
+ }
+ }
+ }
+ return metaData;
+ }
+
+ private void createIndices() {
+ // source indices derive from num dest tasks (==partitions)
+ int numTargetTasks = getContext().getDestinationVertexNumTasks();
+ sourceIndices = new int[numTargetTasks][];
+ for (int i=0; i<numTargetTasks; ++i) {
+ sourceIndices[i] = new int[]{i};
+ }
+ // target indices derive from num src tasks
+ int numSourceTasks = getContext().getSourceVertexNumTasks();
+ targetIndices = new int[numSourceTasks][];
+ for (int i=0; i<numSourceTasks; ++i) {
+ targetIndices[i] = new int[]{i};
+ }
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ createIndices();
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(
+ int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+ if (sourceOutputIndex == destinationTaskIndex) {
+ return getOrCreateCommonRouteMeta().get(sourceTaskIndex);
+ }
+ return null;
+ }
+
+ @Override
+ public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ return EventRouteMetadata.create(1, targetIndices[sourceTaskIndex],
+ sourceIndices[destinationTaskIndex]);
+ }
+
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+ return getOrCreateCommonRouteMeta().get(sourceTaskIndex);
+ }
+
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
@@ -76,6 +150,11 @@ public class ScatterGatherEdgeManager extends EdgeManagerPlugin {
}
@Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
+ return destinationFailedInputIndex;
+ }
+
+ @Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
return getContext().getDestinationVertexNumTasks();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 15382a8..2e884e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1493,12 +1493,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- @Private
- @VisibleForTesting
- public List<TezEvent> getTaskEvents() {
- return tezEventsForTaskAttempts;
- }
-
private static class KillTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override