You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:30 UTC
[21/50] [abbrv] tez git commit: TEZ-1547. Make use of state change
notifier in VertexManagerPlugins and fix TEZ-1494 without latency penalty
(bikas)
TEZ-1547. Make use of state change notifier in VertexManagerPlugins and fix TEZ-1494 without latency penalty (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cd0ed751
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cd0ed751
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cd0ed751
Branch: refs/heads/TEZ-8
Commit: cd0ed751ae502492733738f2d7d7d0ae8e4224e9
Parents: dfef97f
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Nov 4 13:07:06 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Nov 4 13:07:06 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/VertexManagerPlugin.java | 21 ++
.../tez/dag/api/VertexManagerPluginContext.java | 50 ++++
.../apache/tez/dag/api/event/VertexState.java | 17 +-
.../tez/runtime/api/InputInitializer.java | 4 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 5 +-
.../event/VertexEventManagerUserCodeError.java | 36 +++
.../tez/dag/app/dag/event/VertexEventType.java | 1 +
.../dag/impl/ImmediateStartVertexManager.java | 93 ++++---
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 180 ++++++++++--
.../tez/dag/app/dag/impl/VertexManager.java | 150 ++++++++--
.../tez/dag/app/dag/impl/TestDAGImpl.java | 7 -
.../impl/TestImmediateStartVertexManager.java | 131 +++++++++
.../tez/dag/app/dag/impl/TestVertexImpl.java | 279 ++++++++++++++-----
.../tez/dag/app/dag/impl/TestVertexManager.java | 5 +-
.../dag/app/dag/impl/TestVertexRecovery.java | 48 ++++
.../tez/examples/SortMergeJoinExample.java | 16 +-
.../vertexmanager/ShuffleVertexManager.java | 60 +++-
.../vertexmanager/TestShuffleVertexManager.java | 122 +++++---
.../org/apache/tez/test/TestAMRecovery.java | 5 +-
20 files changed, 1005 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a321acf..adb4352 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -100,6 +100,8 @@ ALL CHANGES:
TEZ-1731. OnDiskMerger can end up clobbering files across tasks with LocalDiskFetch enabled.
TEZ-1735. Allow setting basic info per DAG for Tez UI.
TEZ-1728. Remove local host name from Fetcher thread name.
+ TEZ-1547. Make use of state change notifier in VertexManagerPlugins and fix
+ TEZ-1494 without latency penalty
Release 0.5.1: 2014-10-02
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
index b494b12..6aa18d6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -99,4 +100,24 @@ public abstract class VertexManagerPlugin {
public final VertexManagerPluginContext getContext() {
return this.context;
}
+
+ /**
+ * Receive notifications on vertex state changes.
+ * <p/>
+ * State changes will be received based on the registration via
+ * {@link VertexManagerPluginContext#registerForVertexStateUpdates(String, java.util.Set)}
+ * . Notifications will be received for all registered state changes, and not
+ * just for the latest state update. They will be in order in which the state
+ * change occurred.
+ * </p><br>This method may be invoked concurrently with {@link #onVertexStarted(Map)} etc. and
+ * multi-threading/concurrency implications must be considered.
+ *
+ * @param stateUpdate
+ * an event indicating the name of the vertex, and it's updated
+ * state. Additional information may be available for specific
+ * events, Look at the type hierarchy for
+ * {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+ */
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index c1f4bcd..dfa9287 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
@@ -169,4 +170,53 @@ public interface VertexManagerPluginContext {
* @return DAG Attempt number
*/
public int getDAGAttemptNumber();
+
+ /**
+ * Register to get notifications on updates to the specified vertex. Notifications will be sent
+ * via {@link VertexManagerPlugin#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
+ *
+ * This method can only be invoked once. Duplicate invocations will result in an error.
+ *
+ * @param vertexName the vertex name for which notifications are required.
+ * @param stateSet the set of states for which notifications are required. null implies all
+ */
+ void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
+
+ /**
+ * Optional API. No need to call this when the vertex is not fully defined to
+ * start with. E.g. vertex parallelism is not defined, or edges are not
+ * configured. In that case, Tez will assume that the vertex needs
+ * reconfiguration. If the vertex is already fully defined, but the
+ * {@link VertexManagerPlugin} wants to reconfigure the vertex, then it must
+ * use this API to inform Tez about its intention. Without invoking this
+ * method, it is invalid to re-configure the vertex, e.g. via the
+ * {@link #setVertexParallelism(int, VertexLocationHint, Map, Map)} method if
+ * the vertex is already fully defined. This can be invoked at any time until
+ * {@link VertexManagerPlugin#initialize()} has completed. Its invalid to
+ * invoke this method after {@link VertexManagerPlugin#initialize()} has
+ * completed<br>
+ * If this API is invoked, then {@link #doneReconfiguringVertex()} must be
+ * invoked after the {@link VertexManagerPlugin} is done reconfiguring the
+ * vertex, . Actions like scheduling tasks or sending events do not count as
+ * reconfiguration.
+ */
+ public void vertexReconfigurationPlanned();
+
+ /**
+ * Optional API. This needs to be called only if {@link #vertexReconfigurationPlanned()} has been
+ * invoked. This must be called after {@link #vertexReconfigurationPlanned()} is called.
+ */
+ public void doneReconfiguringVertex();
+
+ /**
+ * Optional API. This API can be invoked to declare that the
+ * {@link VertexManagerPlugin} is done with its work. After this the system
+ * will not invoke the plugin methods any more. Its invalid for the plugin to
+ * make further invocations of the context APIs after this. This can be used
+ * to stop receiving further {@link VertexState} notifications after the
+ * plugin has made all changes.
+ */
+ // TODO must be done later after TEZ-1714
+ //public void vertexManagerDone();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
index ab296a5..c9c2d58 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/event/VertexState.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public enum VertexState {
/**
- * Indicates that the Vertex had entered the SUCCEEDED state. A vertex could go back into RUNNING state after SUCCEEDING
+ * Indicates that the Vertex had entered the SUCCEEDED state. A vertex could
+ * go back into RUNNING state after SUCCEEDING
*/
SUCCEEDED,
/**
- * Indicates that the Vertex had entered the RUNNING state. This state can be reached after SUCCEEDED, if some
- * tasks belonging to the vertex are restarted due to errors
+ * Indicates that the Vertex had entered the RUNNING state. This state can be
+ * reached after SUCCEEDED, if some tasks belonging to the vertex are
+ * restarted due to errors
*/
RUNNING,
/**
@@ -47,5 +49,12 @@ public enum VertexState {
/**
* Indicates that the parallelism for the vertex had changed.
*/
- PARALLELISM_UPDATED
+ PARALLELISM_UPDATED,
+ /**
+ * Indicates that the vertex has been completely configured. Parallelism, edges, edge
+ * properties, inputs/outputs have been set and will not be changed any
+ * further. Listeners can depend on the vertex's configured state after
+ * receiving this notification.
+ */
+ CONFIGURED
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
index d9d6517..cc33205 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime.api;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -97,7 +98,8 @@ public abstract class InputInitializer {
*
* Extensive processing should not be performed via this method call. Instead this should just be
* used as a notification mechanism to the main initialization, which is via the initialize method.
- *
+ * <br>This method may be invoked concurrently with {@link #initialize()} etc. and
+ * multi-threading/concurrency implications must be considered.
* @param stateUpdate an event indicating the name of the vertex, and it's updated state.
* Additional information may be available for specific events, Look at the
* type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index fa1f2c4..cfedc41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -89,8 +89,11 @@ public interface Vertex extends Comparable<Vertex> {
void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
- Map<String, InputSpecUpdate> rootInputSpecUpdate) throws AMUserCodeException;
+ Map<String, InputSpecUpdate> rootInputSpecUpdate, boolean fromVertexManager)
+ throws AMUserCodeException;
void setVertexLocationHint(VertexLocationHint vertexLocationHint);
+ void vertexReconfigurationPlanned();
+ void doneReconfiguringVertex();
// CHANGE THESE TO LISTS AND MAINTAIN ORDER?
void setInputVertices(Map<Vertex, Edge> inVertices);
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java
new file mode 100644
index 0000000..022620a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventManagerUserCodeError.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventManagerUserCodeError extends VertexEvent {
+ final AMUserCodeException e;
+
+ public VertexEventManagerUserCodeError(TezVertexID vertexId, AMUserCodeException e) {
+ super(vertexId, VertexEventType.V_MANAGER_USER_CODE_ERROR);
+ this.e = e;
+ }
+
+ public AMUserCodeException getError() {
+ return e;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index e649095..b4f7e29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -42,6 +42,7 @@ public enum VertexEventType {
//Producer:Any component
V_INTERNAL_ERROR,
+ V_MANAGER_USER_CODE_ERROR,
V_ROUTE_EVENT,
V_ONE_TO_ONE_SOURCE_SPLIT,
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 773426b..00b5306 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -18,8 +18,10 @@
package org.apache.tez.dag.app.dag.impl;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.dag.api.EdgeProperty;
@@ -27,11 +29,15 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Starts all tasks immediately on vertex start
@@ -40,18 +46,10 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
private static final Log LOG = LogFactory.getLog(ImmediateStartVertexManager.class);
- private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
+ private final Map<String, Boolean> srcVertexConfigured = Maps.newConcurrentMap();
private int managedTasks;
private boolean tasksScheduled = false;
-
- class SourceVertexInfo {
- EdgeProperty edgeProperty;
- int numFinishedTasks;
-
- SourceVertexInfo(EdgeProperty edgeProperty) {
- this.edgeProperty = edgeProperty;
- }
- }
+ private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
public ImmediateStartVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -63,37 +61,35 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
String srcVertex = entry.getKey();
- EdgeProperty edgeProp = entry.getValue();
- LOG.info("Task count in " + srcVertex + ": " + getContext().getVertexNumTasks(srcVertex));
//track vertices with task count > 0
if (getContext().getVertexNumTasks(srcVertex) > 0) {
- srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+ LOG.info("Task count in " + srcVertex + ": " + getContext().getVertexNumTasks(srcVertex));
+ srcVertexConfigured.put(srcVertex, false);
+ getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
} else {
LOG.info("Vertex: " + getContext().getVertexName() + "; Ignoring " + srcVertex
+ " as it has got 0 tasks");
}
}
-
- //handle completions
- for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
- for (Integer task : entry.getValue()) {
- handleSourceTaskFinished(entry.getKey(), task);
- }
- }
+ onVertexStartedDone.set(true);
scheduleTasks();
}
- private void handleSourceTaskFinished(String vertex, Integer taskId) {
- SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
- //Not mandatory to check for duplicate completions here
- srcInfo.numFinishedTasks++;
- }
-
private void scheduleTasks() {
- if (!canScheduleTasks()) {
+ if (!onVertexStartedDone.get()) {
+ // vertex not started yet
+ return;
+ }
+ if (tasksScheduled) {
+ // already scheduled
return;
}
+ if (!canScheduleTasks()) {
+ return;
+ }
+
+ tasksScheduled = true;
List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
for (int i = 0; i < managedTasks; ++i) {
tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
@@ -103,35 +99,42 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
LOG.info("Starting " + tasksToStart.size() + " in " + getContext().getVertexName());
getContext().scheduleVertexTasks(tasksToStart);
}
- tasksScheduled = true;
+ // all tasks scheduled. Can call vertexManagerDone().
+ // TODO TEZ-1714 for locking issues getContext().vertexManagerDone();
}
private boolean canScheduleTasks() {
- //Check if at least 1 task is finished from each source vertex (in case of broadcast &
- // one-to-one or custom)
- for (Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
- SourceVertexInfo srcVertexInfo = entry.getValue();
- switch(srcVertexInfo.edgeProperty.getDataMovementType()) {
- case ONE_TO_ONE:
- case BROADCAST:
- case CUSTOM:
- if (srcVertexInfo.numFinishedTasks == 0) {
- //do not schedule tasks until a task from source task is complete
- return false;
+ // check for source vertices completely configured
+ for (Map.Entry<String, Boolean> entry : srcVertexConfigured.entrySet()) {
+ if (!entry.getValue().booleanValue()) {
+ // vertex not configured
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: " + getContext().getVertexName());
}
- default:
- break;
+ return false;
}
}
+
return true;
}
+
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED,
+ "Received incorrect state notification : " + stateUpdate.getVertexState() + " for vertex: "
+ + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+ Preconditions.checkArgument(srcVertexConfigured.containsKey(stateUpdate.getVertexName()),
+ "Received incorrect vertex notification : " + stateUpdate.getVertexState() + " for vertex: "
+ + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+ Preconditions.checkState(srcVertexConfigured.put(stateUpdate.getVertexName(), true)
+ .booleanValue() == false);
+ LOG.info("Received configured notification: " + stateUpdate.getVertexState() + " for vertex: "
+ + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+ scheduleTasks();
+ }
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
- handleSourceTaskFinished(srcVertexName, attemptId);
- if (!tasksScheduled) {
- scheduleTasks();
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4a88949..593ecca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -110,6 +110,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
@@ -320,7 +321,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
(VertexState.RECOVERING, VertexState.RECOVERING,
VertexEventType.V_TERMINATE,
new TerminateDuringRecoverTransition())
-
+ .addTransition
+ (VertexState.RECOVERING, EnumSet.of(VertexState.RECOVERING),
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
+ new VertexManagerUserCodeErrorTransition())
+
// Transitions from INITIALIZING state
.addTransition(VertexState.INITIALIZING,
EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
@@ -353,6 +358,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexState.INITIALIZING, VertexState.FAILED),
VertexEventType.V_ROUTE_EVENT,
ROUTE_EVENT_TRANSITION)
+ .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.FAILED),
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
+ new VertexManagerUserCodeErrorTransition())
.addTransition(VertexState.INITIALIZING, VertexState.KILLED,
VertexEventType.V_TERMINATE,
new TerminateInitingVertexTransition())
@@ -399,6 +407,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.INITED, VertexState.KILLED,
VertexEventType.V_TERMINATE,
new TerminateInitedVertexTransition())
+ .addTransition(VertexState.INITED, EnumSet.of(VertexState.FAILED),
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
+ new VertexManagerUserCodeErrorTransition())
.addTransition(VertexState.INITED, VertexState.ERROR,
VertexEventType.V_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
@@ -429,6 +440,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.addTransition(VertexState.RUNNING, VertexState.TERMINATING,
VertexEventType.V_TERMINATE,
new VertexKilledTransition())
+ .addTransition(VertexState.RUNNING, EnumSet.of(VertexState.TERMINATING),
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
+ new VertexManagerUserCodeErrorTransition())
.addTransition(VertexState.RUNNING, VertexState.RUNNING,
VertexEventType.V_TASK_RESCHEDULED,
new TaskRescheduledTransition())
@@ -460,6 +474,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -512,6 +527,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.FAILED, VertexState.FAILED,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
VertexEventType.V_TASK_RESCHEDULED,
@@ -534,6 +550,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Ignore-able events
.addTransition(VertexState.KILLED, VertexState.KILLED,
EnumSet.of(VertexEventType.V_TERMINATE,
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_INIT,
VertexEventType.V_SOURCE_VERTEX_STARTED,
@@ -559,6 +576,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_START,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TERMINATE,
+ VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
@@ -627,6 +645,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private final String vertexName;
private final ProcessorDescriptor processorDescriptor;
+
+ private boolean vertexToBeReconfiguredByManager = false;
+ AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
+ AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
@VisibleForTesting
Map<Vertex, Edge> sourceVertices;
@@ -1212,14 +1234,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
- Map<String, InputSpecUpdate> rootInputSpecUpdates) throws AMUserCodeException {
- setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, false);
+ Map<String, InputSpecUpdate> rootInputSpecUpdates, boolean fromVertexManager)
+ throws AMUserCodeException {
+ setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
+ false, fromVertexManager);
}
private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
Map<String, InputSpecUpdate> rootInputSpecUpdates,
- boolean recovering) throws AMUserCodeException {
+ boolean recovering, boolean fromVertexManager) throws AMUserCodeException {
if (recovering) {
writeLock.lock();
try {
@@ -1255,6 +1279,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ parallelism + " for vertex: " + logIdentifier);
setVertexLocationHint(vertexLocationHint);
writeLock.lock();
+
try {
if (parallelismSet == true) {
String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier;
@@ -1262,6 +1287,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
throw new TezUncheckedException(msg);
}
+ if (fromVertexManager && canInitVertex()) {
+ // vertex is fully defined. setParallelism has been called. VertexManager should have
+ // informed us about this. Otherwise we would have notified listeners that we are fully
+ // defined before we are actually fully defined
+ Preconditions.checkState(vertexToBeReconfiguredByManager, "Vertex is fully configured but still"
+ + " the reconfiguration API has been called. VertexManager must notify the framework using "
+ + " context.vertexReconfigurationPlanned() before re-configuring the vertex.");
+ }
+
parallelismSet = true;
// Input initializer/Vertex Manager/1-1 split expected to set parallelism.
@@ -1421,6 +1455,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ @Override
public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
writeLock.lock();
try {
@@ -1432,6 +1467,46 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
writeLock.unlock();
}
}
+
+ @Override
+ public void vertexReconfigurationPlanned() {
+ vertexReconfigurationPlanned(false);
+ }
+
+ public void vertexReconfigurationPlanned(boolean testOverride) {
+ writeLock.lock();
+ try {
+ if (testOverride) {
+ Preconditions.checkState(vmIsInitialized.get() && completelyConfiguredSent.get(),
+ "test should override only failed cases");
+ } else {
+ Preconditions.checkState(!vmIsInitialized.get(),
+ "context.vertexReconfigurationPlanned() cannot be called after initialize()");
+ Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() "
+ + " cannot be invoked after the vertex has been configured.");
+ }
+ this.vertexToBeReconfiguredByManager = true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void doneReconfiguringVertex() {
+ writeLock.lock();
+ try {
+ Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
+ + "invoked only after vertexReconfigurationPlanned() is invoked");
+ this.vertexToBeReconfiguredByManager = false;
+ if (completelyConfiguredSent.compareAndSet(false, true)) {
+ // vertex already started and at that time this event was not sent. Send now.
+ stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
+ org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
@Override
/**
@@ -1970,6 +2045,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
try {
vertexManager.initialize();
+ vmIsInitialized.set(true);
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
LOG.error(msg, e);
@@ -2042,7 +2118,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.getVertexManagerPlugin());
LOG.info("Setting user vertex manager plugin: "
+ pluginDesc.getClassName() + " on vertex: " + getName());
- vertexManager = new VertexManager(pluginDesc, this, appContext);
+ vertexManager = new VertexManager(pluginDesc, this, appContext, stateChangeNotifier);
} else {
// Intended order of picking a vertex manager
// If there is an InputInitializer then we use the RootInputVertexManager. May be fixed by TEZ-703
@@ -2055,26 +2131,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ logIdentifier);
vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()),
- this, appContext);
+ this, appContext, stateChangeNotifier);
} else if (hasOneToOne && !hasCustom) {
LOG.info("Setting vertexManager to InputReadyVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()),
- this, appContext);
+ this, appContext, stateChangeNotifier);
} else if (hasBipartite && !hasCustom) {
LOG.info("Setting vertexManager to ShuffleVertexManager for "
+ logIdentifier);
// shuffle vertex manager needs a conf payload
vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(),
- this, appContext);
+ this, appContext, stateChangeNotifier);
} else {
// schedule all tasks upon vertex start. Default behavior.
LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()),
- this, appContext);
+ this, appContext, stateChangeNotifier);
}
}
}
@@ -2222,7 +2298,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new TaskEventRecoverTask(task.getTaskId()));
}
try {
- vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ vertex.recoveryCodeSimulatingStart();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -2274,7 +2350,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
taskState));
}
try {
- vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ vertex.recoveryCodeSimulatingStart();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
@@ -2335,6 +2411,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+
+ private void recoveryCodeSimulatingStart() throws AMUserCodeException {
+ vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+ // This code is duplicated from startVertex() because recovery does not follow normal
+ // transitions. To be removed after recovery code is fixed.
+ maybeSendConfiguredEvent();
+ }
private void routeRecoveredEvents(VertexState vertexState,
List<TezEvent> tezEvents) {
@@ -2556,7 +2639,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
boolean successSetParallelism ;
try {
vertex.setParallelism(0,
- null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true);
+ null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true, false);
successSetParallelism = true;
} catch (Exception e) {
successSetParallelism = false;
@@ -2614,7 +2697,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
try {
vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
- vertex.recoveredRootInputSpecUpdates, true);
+ vertex.recoveredRootInputSpecUpdates, true, false);
successSetParallelism = true;
} catch (Exception e) {
successSetParallelism = false;
@@ -2634,7 +2717,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new TaskEventRecoverTask(task.getTaskId()));
}
try {
- vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ vertex.recoveryCodeSimulatingStart();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
@@ -2674,7 +2757,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
// Wait for all tasks to recover and report back
try {
- vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ vertex.recoveryCodeSimulatingStart();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
@@ -2825,10 +2908,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
+ // this block must always return VertexState.INITIALIZING
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
+ " to set #tasks for the vertex " + vertex.getVertexId());
if (vertex.inputsWithInitializers != null) {
+ LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
vertex.setupInputInitializerManager();
return VertexState.INITIALIZING;
} else {
@@ -2857,8 +2942,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} else {
LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
vertex.createTasks();
-
+ // this block may return VertexState.INITIALIZING
if (vertex.inputsWithInitializers != null) {
+ LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
vertex.setupInputInitializerManager();
return VertexState.INITIALIZING;
}
@@ -2867,6 +2953,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return VertexState.INITIALIZING;
}
LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
+ // vertex is completely configured. Send out notification now.
+ vertex.maybeSendConfiguredEvent();
boolean isInitialized = vertex.initializeVertex();
if (isInitialized) {
return VertexState.INITED;
@@ -3020,7 +3108,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
" numTasks " + splitEvent.getNumTasks());
vertex.originalOneToOneSplitSource = originalSplitSource;
try {
- vertex.setParallelism(splitEvent.getNumTasks(), null, null, null);
+ vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false);
} catch (Exception e) {
// ingore this exception, should not happen
LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager,"
@@ -3130,8 +3218,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return vertex.startVertex();
}
}
+
+ private void maybeSendConfiguredEvent() {
+ // the vertex is fully configured by the time it starts. Always notify completely configured
+ // unless the vertex manager has told us that it is going to reconfigure it further
+ Preconditions.checkState(canInitVertex());
+ if (!this.vertexToBeReconfiguredByManager) {
+ // this vertex will not be reconfigured by its manager
+ if (completelyConfiguredSent.compareAndSet(false, true)) {
+ stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
+ org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+ }
+ }
+ }
private VertexState startVertex() {
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // IMPORTANT - Until Recovery is fixed to use normal state transitions, if any code is added
+ // here then please check if it needs to be duplicated in recoveryCodeSimulatingStart().
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Preconditions.checkState(getState() == VertexState.INITED,
"Vertex must be inited " + logIdentifier);
@@ -3147,7 +3252,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
pendingReportedSrcCompletions.clear();
logJobHistoryVertexStartedEvent();
-
+
+ // the vertex is fully configured by the time it starts. Always notify completely configured
+ // unless the vertex manager has told us that it is going to reconfigure it further.
+ // If the vertex was pre-configured then the event would have been sent out earlier. Calling again
+ // would be a no-op. If the vertex was not fully configured and waiting for that to complete then
+ // we would start immediately after that. Either parallelism updated (now) or IPO changed (future)
+ // or vertex added (future). Simplify these cases by sending the event now automatically for the
+ // user as if they had invoked the planned()/done() API's.
+ maybeSendConfiguredEvent();
+
// TODO: Metrics
//job.metrics.runningJob(job);
@@ -3309,6 +3423,36 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ private static class VertexManagerUserCodeErrorTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ VertexEventManagerUserCodeError errEvent = ((VertexEventManagerUserCodeError) event);
+ AMUserCodeException e = errEvent.getError();
+ String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
+ LOG.error(msg, e);
+
+ if (vertex.getState() == VertexState.RECOVERING) {
+ LOG.info("Received a user code error during recovering, setting recovered"
+ + " state to FAILED");
+ vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
+ vertex.terminationCause = VertexTerminationCause.AM_USERCODE_FAILURE;
+ vertex.recoveredState = VertexState.FAILED;
+ return VertexState.RECOVERING;
+ } else if (vertex.getState() == VertexState.RUNNING) {
+ vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
+ vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE,
+ TaskTerminationCause.AM_USERCODE_FAILURE);
+ return VertexState.TERMINATING;
+ } else {
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.AM_USERCODE_FAILURE, msg
+ + ", " + ExceptionUtils.getStackTrace(e.getCause()));
+ return VertexState.FAILED;
+ }
+ }
+ }
+
/**
* Here, the Vertex is being told that one of it's source task-attempts
* completed.
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 1bfb0f9..dd38c2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -46,11 +47,16 @@ import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
+import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.Event;
@@ -75,18 +81,27 @@ public class VertexManager {
UserPayload payload = null;
AppContext appContext;
BlockingQueue<TezEvent> rootInputInitEventQueue;
+ StateChangeNotifier stateChangeNotifier;
private static final Log LOG = LogFactory.getLog(VertexManager.class);
- class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
- // TODO Add functionality to allow VertexManagers to send VertexManagerEvents
+ class VertexManagerPluginContextImpl implements VertexManagerPluginContext, VertexStateUpdateListener {
private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
managedVertex.getName(), "NULL_VERTEX", null);
private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
+ private final List<String> notificationRegisteredVertices = Lists.newArrayList();
+ AtomicBoolean isComplete = new AtomicBoolean(false);
+ private void checkAndThrowIfDone() {
+ if (isComplete()) {
+ throw new TezUncheckedException("Cannot invoke context methods after reporting done");
+ }
+ }
+
@Override
- public Map<String, EdgeProperty> getInputVertexEdgeProperties() {
+ public synchronized Map<String, EdgeProperty> getInputVertexEdgeProperties() {
+ checkAndThrowIfDone();
// TODO Something similar for Initial Inputs - payload etc visible
Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
Map<String, EdgeProperty> vertexEdgeMap =
@@ -98,22 +113,25 @@ public class VertexManager {
}
@Override
- public String getVertexName() {
+ public synchronized String getVertexName() {
+ checkAndThrowIfDone();
return managedVertex.getName();
}
@Override
- public int getVertexNumTasks(String vertexName) {
+ public synchronized int getVertexNumTasks(String vertexName) {
+ checkAndThrowIfDone();
return appContext.getCurrentDAG().getVertex(vertexName).getTotalTasks();
}
@Override
- public void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+ public synchronized void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
Map<String, InputSpecUpdate> rootInputSpecUpdate) {
+ checkAndThrowIfDone();
try {
managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
- rootInputSpecUpdate);
+ rootInputSpecUpdate, true);
} catch (AMUserCodeException e) {
// workaround: convert it to TezUncheckedException which would be caught in VM
throw new TezUncheckedException(e);
@@ -121,13 +139,15 @@ public class VertexManager {
}
@Override
- public void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+ public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+ checkAndThrowIfDone();
managedVertex.scheduleTasks(tasks);
}
@Nullable
@Override
- public Set<String> getVertexInputNames() {
+ public synchronized Set<String> getVertexInputNames() {
+ checkAndThrowIfDone();
Set<String> inputNames = null;
Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputs = managedVertex.getAdditionalInputs();
@@ -138,13 +158,15 @@ public class VertexManager {
}
@Override
- public UserPayload getUserPayload() {
+ public synchronized UserPayload getUserPayload() {
+ checkAndThrowIfDone();
return payload;
}
@Override
- public void addRootInputEvents(final String inputName,
+ public synchronized void addRootInputEvents(final String inputName,
Collection<InputDataInformationEvent> events) {
+ checkAndThrowIfDone();
verifyIsRootInput(inputName);
Collection<TezEvent> tezEvents = Collections2.transform(events,
new Function<InputDataInformationEvent, TezEvent>() {
@@ -166,13 +188,15 @@ public class VertexManager {
@Override
- public void setVertexLocationHint(VertexLocationHint locationHint) {
+ public synchronized void setVertexLocationHint(VertexLocationHint locationHint) {
+ checkAndThrowIfDone();
Preconditions.checkNotNull(locationHint, "locationHint is null");
managedVertex.setVertexLocationHint(locationHint);
}
@Override
- public int getDAGAttemptNumber() {
+ public synchronized int getDAGAttemptNumber() {
+ checkAndThrowIfDone();
return appContext.getApplicationAttemptId().getAttemptId();
}
@@ -192,22 +216,26 @@ public class VertexManager {
}
@Override
- public Resource getVertexTaskResource() {
+ public synchronized Resource getVertexTaskResource() {
+ checkAndThrowIfDone();
return managedVertex.getTaskResource();
}
@Override
- public Resource getTotalAvailableResource() {
+ public synchronized Resource getTotalAvailableResource() {
+ checkAndThrowIfDone();
return appContext.getTaskScheduler().getTotalResources();
}
@Override
- public int getNumClusterNodes() {
+ public synchronized int getNumClusterNodes() {
+ checkAndThrowIfDone();
return appContext.getTaskScheduler().getNumClusterNodes();
}
@Override
- public Container getTaskContainer(String vertexName, Integer taskIndex) {
+ public synchronized Container getTaskContainer(String vertexName, Integer taskIndex) {
+ checkAndThrowIfDone();
Vertex vertex = appContext.getCurrentDAG().getVertex(vertexName);
Task task = vertex.getTask(taskIndex.intValue());
TaskAttempt attempt = task.getSuccessfulAttempt();
@@ -216,16 +244,82 @@ public class VertexManager {
}
return null;
}
+
+ @Override
+ public synchronized void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
+ checkAndThrowIfDone();
+ synchronized(notificationRegisteredVertices) {
+ notificationRegisteredVertices.add(vertexName);
+ }
+ stateChangeNotifier.registerForVertexUpdates(vertexName, stateSet, this);
+ }
+
+ private void unregisterForVertexStateUpdates() {
+ synchronized (notificationRegisteredVertices) {
+ for (String vertexName : notificationRegisteredVertices) {
+ stateChangeNotifier.unregisterForVertexUpdates(vertexName, this);
+ }
+
+ }
+ }
+
+ boolean isComplete() {
+ return (isComplete.get() == true);
+ }
+
+ // TODO add later after TEZ-1714 @Override
+ public synchronized void vertexManagerDone() {
+ checkAndThrowIfDone();
+ LOG.info("Vertex Manager reported done for : " + managedVertex.getLogIdentifier());
+ this.isComplete.set(true);
+ unregisterForVertexStateUpdates();
+ }
+
+ @Override
+ public synchronized void vertexReconfigurationPlanned() {
+ checkAndThrowIfDone();
+ managedVertex.vertexReconfigurationPlanned();
+ }
+
+ @Override
+ public synchronized void doneReconfiguringVertex() {
+ checkAndThrowIfDone();
+ managedVertex.doneReconfiguringVertex();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized void onStateUpdated(VertexStateUpdate event) {
+ if (isComplete()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping state update for vertex=" + event.getVertexName() + ", state=" +
+ event.getVertexState() +
+ " since vertexmanager for " + managedVertex.getLogIdentifier() + " is complete.");
+ }
+ } else {
+ try {
+ plugin.onVertexStateUpdated(event);
+ } catch (Exception e) {
+ // state change must be triggered via an event transition
+ appContext.getEventHandler().handle(
+ new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
+ new AMUserCodeException(Source.VertexManager, e)));
+ }
+ }
+ }
+
}
public VertexManager(VertexManagerPluginDescriptor pluginDesc,
- Vertex managedVertex, AppContext appContext) {
+ Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) {
checkNotNull(pluginDesc, "pluginDesc is null");
checkNotNull(managedVertex, "managedVertex is null");
checkNotNull(appContext, "appContext is null");
+ checkNotNull(stateChangeNotifier, "notifier is null");
this.pluginDesc = pluginDesc;
this.managedVertex = managedVertex;
this.appContext = appContext;
+ this.stateChangeNotifier = stateChangeNotifier;
// don't specify the size of rootInputInitEventQueue, otherwise it will fail when addAll
this.rootInputInitEventQueue = new LinkedBlockingQueue<TezEvent>();
}
@@ -242,7 +336,9 @@ public class VertexManager {
payload = pluginDesc.getUserPayload();
}
try {
- plugin.initialize();
+ if (!pluginContext.isComplete()) {
+ plugin.initialize();
+ }
} catch (Exception e) {
throw new AMUserCodeException(Source.VertexManager, e);
}
@@ -265,7 +361,9 @@ public class VertexManager {
}
}
try {
- plugin.onVertexStarted(pluginCompletionsMap);
+ if (!pluginContext.isComplete()) {
+ plugin.onVertexStarted(pluginCompletionsMap);
+ }
} catch (Exception e) {
throw new AMUserCodeException(Source.VertexManager, e);
}
@@ -276,7 +374,9 @@ public class VertexManager {
String vertexName =
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
try {
- plugin.onSourceTaskCompleted(vertexName, taskId);
+ if (!pluginContext.isComplete()) {
+ plugin.onSourceTaskCompleted(vertexName, taskId);
+ }
} catch (Exception e) {
throw new AMUserCodeException(Source.VertexManager, e);
}
@@ -284,7 +384,9 @@ public class VertexManager {
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws AMUserCodeException {
try {
- plugin.onVertexManagerEventReceived(vmEvent);
+ if (!pluginContext.isComplete()) {
+ plugin.onVertexManagerEventReceived(vmEvent);
+ }
} catch (Exception e) {
throw new AMUserCodeException(Source.VertexManager, e);
}
@@ -293,7 +395,9 @@ public class VertexManager {
public List<TezEvent> onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) throws AMUserCodeException {
try {
- plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+ if (!pluginContext.isComplete()) {
+ plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+ }
} catch (Exception e) {
throw new AMUserCodeException(Source.VertexManager, e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index f1961aa..d859ae0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -898,13 +898,10 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
- Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
LOG.info(v2.getTasks().size());
Task t1= v2.getTask(0);
- dispatcher.getEventHandler().handle(new TaskEvent(t1.getTaskId(), TaskEventType.T_SCHEDULE));
- dispatcher.await();
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
@@ -947,7 +944,6 @@ public class TestDAGImpl {
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
- v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
dispatcher.await();
Task t1= v2.getTask(0);
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
@@ -977,7 +973,6 @@ public class TestDAGImpl {
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
- v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
dispatcher.await();
Task t1= v2.getTask(0);
@@ -1007,7 +1002,6 @@ public class TestDAGImpl {
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
- v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
dispatcher.await();
Task t1= v2.getTask(0);
@@ -1038,7 +1032,6 @@ public class TestDAGImpl {
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
- v2.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
dispatcher.await();
Task t1= v2.getTask(0);
http://git-wip-us.apache.org/repos/asf/tez/blob/cd0ed751/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
new file mode 100644
index 0000000..6d071a7
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Mockito.anySet;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestImmediateStartVertexManager {
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test (timeout=5000)
+ public void testBasic() {
+ HashMap<String, EdgeProperty> mockInputVertices =
+ new HashMap<String, EdgeProperty>();
+ final String mockSrcVertexId1 = "Vertex1";
+ EdgeProperty eProp1 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+ final String mockSrcVertexId2 = "Vertex2";
+ EdgeProperty eProp2 = EdgeProperty.create(mock(EdgeManagerPluginDescriptor.class),
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+ final String mockSrcVertexId3 = "Vertex3";
+ EdgeProperty eProp3 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.BROADCAST,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ final String mockManagedVertexId = "Vertex4";
+
+ mockInputVertices.put(mockSrcVertexId1, eProp1);
+ mockInputVertices.put(mockSrcVertexId2, eProp2);
+ mockInputVertices.put(mockSrcVertexId3, eProp3);
+
+ final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(2);
+
+ final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
+ doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ scheduledTasks.clear();
+ List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
+ for (TaskWithLocationHint task : tasks) {
+ scheduledTasks.add(task.getTaskIndex());
+ }
+ return null;
+ }}).when(mockContext).scheduleVertexTasks(anyList());
+
+ ImmediateStartVertexManager manager = new ImmediateStartVertexManager(mockContext);
+ manager.initialize();
+ manager.onVertexStarted(null);
+ verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
+ VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
+ VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
+ VertexState.CONFIGURED));
+ verify(mockContext, times(1)).scheduleVertexTasks(anyList());
+ Assert.assertEquals(4, scheduledTasks.size());
+
+ // simulate race between onVertexStarted and notifications
+ scheduledTasks.clear();
+ final ImmediateStartVertexManager raceManager = new ImmediateStartVertexManager(mockContext);
+ doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ raceManager.onVertexStateUpdated(new VertexStateUpdate((String)invocation.getArguments()[0],
+ VertexState.CONFIGURED));
+ scheduledTasks.clear();
+ return null;
+ }}).when(mockContext).registerForVertexStateUpdates(anyString(), anySet());
+ raceManager.initialize();
+ raceManager.onVertexStarted(null);
+ verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+ Assert.assertEquals(4, scheduledTasks.size());
+ }
+
+}