You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/10 22:37:53 UTC
tez git commit: TEZ-1914. VertexManager logic should not run on the
central dispatcher (bikas)
Repository: tez
Updated Branches:
refs/heads/master 48055300d -> f03546896
TEZ-1914. VertexManager logic should not run on the central dispatcher (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0354689
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0354689
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0354689
Branch: refs/heads/master
Commit: f03546896ed13bb605e8f39e738d4221de04bd22
Parents: 4805530
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Feb 10 13:37:43 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Feb 10 13:37:43 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/AsyncDispatcher.java | 4 +-
.../java/org/apache/tez/dag/app/AppContext.java | 4 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 21 ++
.../tez/dag/app/dag/event/CallableEvent.java | 42 +++
.../dag/app/dag/event/CallableEventType.java | 25 ++
.../event/VertexEventInputDataInformation.java | 40 +++
.../tez/dag/app/dag/event/VertexEventType.java | 2 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 85 +++--
.../tez/dag/app/dag/impl/VertexManager.java | 311 +++++++++++++++----
.../app/dag/impl/CallableEventDispatcher.java | 37 +++
.../tez/dag/app/dag/impl/TestDAGImpl.java | 35 ++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 99 ++++--
.../tez/dag/app/dag/impl/TestVertexManager.java | 82 +++--
.../vertexmanager/InputReadyVertexManager.java | 2 +-
.../TestInputReadyVertexManager.java | 6 -
17 files changed, 646 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4da24a7..d617bee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1914. VertexManager logic should not run on the central dispatcher
TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.
TEZ-1999. IndexOutOfBoundsException during merge.
TEZ-2000. Source vertex exists error during DAG submission.
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index c23d669..253db23 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -87,7 +86,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
this.eventDispatchers = Maps.newHashMap();
}
- Runnable createThread() {
+ public Runnable createThread() {
return new Runnable() {
@Override
public void run() {
@@ -122,6 +121,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ // TODO TEZ-2049 remove YARN reference
this.exitOnDispatchException =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index f8086d0..5564809 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -37,6 +37,8 @@ import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezDAGID;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
/**
* Context interface for sharing information across components in Tez DAG
@@ -63,6 +65,8 @@ public interface AppContext {
String getUser();
DAG getCurrentDAG();
+
+ ListeningExecutorService getExecService();
void setDAG(DAG dag);
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c7e1e83..5aca3cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -47,6 +47,8 @@ import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -163,6 +165,9 @@ import org.codehaus.jettison.json.JSONException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The Tez DAG Application Master.
@@ -254,6 +259,10 @@ public class DAGAppMaster extends AbstractService {
private Path currentRecoveryDataDir;
private Path tezSystemStagingDir;
private FileSystem recoveryFS;
+
+ private ExecutorService rawExecutor;
+ private ListeningExecutorService execService;
+
/**
* set of already executed dag names.
*/
@@ -483,6 +492,10 @@ public class DAGAppMaster extends AbstractService {
}
}
+ rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("App Shared Pool - " + "#%d").build());
+ execService = MoreExecutors.listeningDecorator(rawExecutor);
+
initServices(conf);
super.serviceInit(conf);
@@ -1261,6 +1274,11 @@ public class DAGAppMaster extends AbstractService {
rLock.unlock();
}
}
+
+ @Override
+ public ListeningExecutorService getExecService() {
+ return execService;
+ }
@Override
public Set<String> getAllDAGIDs() {
@@ -1677,6 +1695,7 @@ public class DAGAppMaster extends AbstractService {
if (this.dagSubmissionTimer != null) {
this.dagSubmissionTimer.cancel();
}
+
stopServices();
// Given pre-emption, we should delete tez scratch dir only if unregister is
@@ -1708,6 +1727,8 @@ public class DAGAppMaster extends AbstractService {
}
}
+ execService.shutdownNow();
+
super.serviceStop();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
new file mode 100644
index 0000000..e148fe8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+import com.google.common.util.concurrent.FutureCallback;
+
+public abstract class CallableEvent extends AbstractEvent<CallableEventType> implements
+ Callable<Void> {
+ private final FutureCallback<Void> callback;
+
+ public CallableEvent(FutureCallback<Void> callback) {
+ super(CallableEventType.CALLABLE);
+ this.callback = callback;
+ }
+
+ public FutureCallback<Void> getCallback() {
+ return callback;
+ }
+
+ @Override
+ public abstract Void call() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java
new file mode 100644
index 0000000..e9e93b9
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+public enum CallableEventType {
+
+ CALLABLE,
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java
new file mode 100644
index 0000000..6b5cad5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.List;
+
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class VertexEventInputDataInformation extends VertexEvent {
+
+ private final List<TezEvent> events;
+
+ public VertexEventInputDataInformation(TezVertexID vertexId, List<TezEvent> events) {
+ super(vertexId, VertexEventType.V_INPUT_DATA_INFORMATION);
+ this.events = events;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 5eb4929..aa202a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -49,6 +49,8 @@ public enum VertexEventType {
//Producer: VertexInputInitializer
V_ROOT_INPUT_INITIALIZED,
V_ROOT_INPUT_FAILED,
+
+ V_INPUT_DATA_INFORMATION,
// Recover Event, Producer:DAG
V_RECOVER,
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 149033c..aba20cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -252,15 +252,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(
TaskEventType.T_TERMINATE,
+ TaskEventType.T_SCHEDULE,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from KILLED state
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
EnumSet.of(
TaskEventType.T_TERMINATE,
+ TaskEventType.T_SCHEDULE,
TaskEventType.T_ADD_SPEC_ATTEMPT))
- .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
- TaskEventType.T_SCHEDULE)
// create the topology tables
.installTopology();
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 865b182..05c3cc1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import com.google.common.base.Strings;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
@@ -114,6 +115,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
@@ -342,6 +344,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
new RootInputInitializedTransition())
.addTransition(VertexState.INITIALIZING,
+ EnumSet.of(VertexState.INITIALIZING, VertexState.INITED,
+ VertexState.FAILED),
+ VertexEventType.V_INPUT_DATA_INFORMATION,
+ new InputDataInformationTransition())
+ .addTransition(VertexState.INITIALIZING,
EnumSet.of(VertexState.INITED, VertexState.FAILED),
VertexEventType.V_READY_TO_INIT,
new VertexInitializedTransition())
@@ -861,7 +868,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public int getTotalTasks() {
- return numTasks;
+ readLock.lock();
+ try {
+ return numTasks;
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -2175,7 +2187,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
.getVertexManagerPlugin());
LOG.info("Setting user vertex manager plugin: "
+ pluginDesc.getClassName() + " on vertex: " + getLogIdentifier());
- vertexManager = new VertexManager(pluginDesc, this, appContext, stateChangeNotifier);
+ vertexManager = new VertexManager(pluginDesc, dagUgi, this, appContext, stateChangeNotifier);
} else {
// Intended order of picking a vertex manager
// If there is an InputInitializer then we use the RootInputVertexManager. May be fixed by TEZ-703
@@ -2188,26 +2200,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ logIdentifier);
vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()),
- this, appContext, stateChangeNotifier);
+ dagUgi, this, appContext, stateChangeNotifier);
} else if (hasOneToOne && !hasCustom) {
LOG.info("Setting vertexManager to InputReadyVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()),
- this, appContext, stateChangeNotifier);
+ dagUgi, this, appContext, stateChangeNotifier);
} else if (hasBipartite && !hasCustom) {
LOG.info("Setting vertexManager to ShuffleVertexManager for "
+ logIdentifier);
// shuffle vertex manager needs a conf payload
vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(),
- this, appContext, stateChangeNotifier);
+ dagUgi, this, appContext, stateChangeNotifier);
} else {
// schedule all tasks upon vertex start. Default behavior.
LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()),
- this, appContext, stateChangeNotifier);
+ dagUgi, this, appContext, stateChangeNotifier);
}
}
}
@@ -3063,14 +3075,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState state = vertex.getState();
if (state == VertexState.INITIALIZING) {
try {
- List<TezEvent> inputInfoEvents =
- vertex.vertexManager.onRootVertexInitialized(
- liInitEvent.getInputName(),
- vertex.getAdditionalInputs().get(liInitEvent.getInputName())
- .getIODescriptor(), liInitEvent.getEvents());
- if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
- VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false);
- }
+ vertex.vertexManager.onRootVertexInitialized(liInitEvent.getInputName(), vertex
+ .getAdditionalInputs().get(liInitEvent.getInputName()).getIODescriptor(),
+ liInitEvent.getEvents());
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
LOG.error(msg, e);
@@ -3087,10 +3094,35 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.rootInputInitializerManager.shutdown();
vertex.rootInputInitializerManager = null;
}
+
+ // the return of these events from the VM will complete initialization and move into
+ // INITED state if possible via InputDataInformationTransition
+
+ return vertex.getState();
+ }
+ }
+
+ public static class InputDataInformationTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation) event;
+ List<TezEvent> inputInfoEvents = iEvent.getEvents();
+ try {
+ if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
+ VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false);
+ }
+ } catch (AMUserCodeException e) {
+ String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
+ LOG.error(msg, e);
+ vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + ","
+ + ExceptionUtils.getStackTrace(e.getCause()));
+ return VertexState.FAILED;
+ }
// done. check if we need to do the initialization
- if (vertex.getState() == VertexState.INITIALIZING &&
- vertex.initWaitsForRootInitializers) {
+ if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) {
if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
// set the wait flag to false if all initializers are done
vertex.initWaitsForRootInitializers = false;
@@ -4021,7 +4053,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
getAdditionalInputs() {
- return this.rootInputDescriptors;
+ readLock.lock();
+ try {
+ return this.rootInputDescriptors;
+ } finally {
+ readLock.unlock();
+ }
}
@Nullable
@@ -4059,7 +4096,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public Map<Vertex, Edge> getInputVertices() {
- return Collections.unmodifiableMap(this.sourceVertices);
+ readLock.lock();
+ try {
+ return Collections.unmodifiableMap(this.sourceVertices);
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -4092,7 +4134,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
public Resource getTaskResource() {
- return taskResource;
+ readLock.lock();
+ try {
+ return taskResource;
+ } finally {
+ readLock.unlock();
+ }
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index da86151..af92348 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.app.dag.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -33,6 +35,7 @@ import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ReflectionUtils;
@@ -54,6 +57,8 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
@@ -62,7 +67,6 @@ import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -72,18 +76,30 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+@SuppressWarnings("unchecked")
public class VertexManager {
- VertexManagerPluginDescriptor pluginDesc;
- VertexManagerPlugin plugin;
- Vertex managedVertex;
- VertexManagerPluginContextImpl pluginContext;
- UserPayload payload = null;
- AppContext appContext;
- BlockingQueue<TezEvent> rootInputInitEventQueue;
- StateChangeNotifier stateChangeNotifier;
+ final VertexManagerPluginDescriptor pluginDesc;
+ final UserGroupInformation dagUgi;
+ final VertexManagerPlugin plugin;
+ final Vertex managedVertex;
+ final VertexManagerPluginContextImpl pluginContext;
+ final UserPayload payload;
+ final AppContext appContext;
+ final BlockingQueue<TezEvent> rootInputInitEventQueue;
+ final StateChangeNotifier stateChangeNotifier;
+
+ private final ListeningExecutorService execService;
+ private final LinkedBlockingQueue<VertexManagerEvent> eventQueue;
+ private final AtomicBoolean eventInFlight;
+ private final AtomicBoolean pluginFailed;
private static final Log LOG = LogFactory.getLog(VertexManager.class);
+ private final VertexManagerCallback VM_CALLBACK = new VertexManagerCallback();
class VertexManagerPluginContextImpl implements VertexManagerPluginContext, VertexStateUpdateListener {
@@ -97,6 +113,9 @@ public class VertexManager {
if (isComplete()) {
throw new TezUncheckedException("Cannot invoke context methods after reporting done");
}
+ if (pluginFailed.get()) {
+ throw new TezUncheckedException("Cannot invoke context methods after throwing an exception");
+ }
}
@Override
@@ -233,6 +252,7 @@ public class VertexManager {
return appContext.getTaskScheduler().getNumClusterNodes();
}
+ // TODO TEZ-2048. Remove this API
@Override
public synchronized Container getTaskContainer(String vertexName, Integer taskIndex) {
checkAndThrowIfDone();
@@ -287,41 +307,36 @@ public class VertexManager {
managedVertex.doneReconfiguringVertex();
}
- @SuppressWarnings("unchecked")
@Override
public synchronized void onStateUpdated(VertexStateUpdate event) {
- if (isComplete()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping state update for vertex=" + event.getVertexName() + ", state=" +
- event.getVertexState() +
- " since vertexmanager for " + managedVertex.getLogIdentifier() + " is complete.");
- }
- } else {
- try {
- plugin.onVertexStateUpdated(event);
- } catch (Exception e) {
- // state change must be triggered via an event transition
- appContext.getEventHandler().handle(
- new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
- new AMUserCodeException(Source.VertexManager, e)));
- }
- }
+ enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStateUpdate(event));
}
}
- public VertexManager(VertexManagerPluginDescriptor pluginDesc,
+ public VertexManager(VertexManagerPluginDescriptor pluginDesc, UserGroupInformation dagUgi,
Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) {
checkNotNull(pluginDesc, "pluginDesc is null");
checkNotNull(managedVertex, "managedVertex is null");
checkNotNull(appContext, "appContext is null");
checkNotNull(stateChangeNotifier, "notifier is null");
this.pluginDesc = pluginDesc;
+ this.dagUgi = dagUgi;
this.managedVertex = managedVertex;
this.appContext = appContext;
this.stateChangeNotifier = stateChangeNotifier;
// don't specify the size of rootInputInitEventQueue, otherwise it will fail when addAll
this.rootInputInitEventQueue = new LinkedBlockingQueue<TezEvent>();
+
+ pluginContext = new VertexManagerPluginContextImpl();
+ Preconditions.checkArgument(pluginDesc != null);
+ plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
+ new Class[] { VertexManagerPluginContext.class }, new Object[] { pluginContext });
+ payload = pluginDesc.getUserPayload();
+ execService = appContext.getExecService();
+ eventQueue = new LinkedBlockingQueue<VertexManagerEvent>();
+ eventInFlight = new AtomicBoolean(false);
+ pluginFailed = new AtomicBoolean(false);
}
public VertexManagerPlugin getPlugin() {
@@ -329,20 +344,57 @@ public class VertexManager {
}
public void initialize() throws AMUserCodeException {
- pluginContext = new VertexManagerPluginContextImpl();
- if (pluginDesc != null) {
- plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
- new Class[]{VertexManagerPluginContext.class}, new Object[]{pluginContext});
- payload = pluginDesc.getUserPayload();
- }
try {
if (!pluginContext.isComplete()) {
- plugin.initialize();
+ // TODO TEZ-2066 tracks moving this async.
+ synchronized (VertexManager.this) {
+ plugin.initialize();
+ }
}
} catch (Exception e) {
throw new AMUserCodeException(Source.VertexManager, e);
}
}
+
+ private boolean pluginInvocationAllowed(String msg) {
+ if (pluginFailed.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(msg + " . Manager failed. Vertex=" + managedVertex.getLogIdentifier());
+ }
+ return false;
+ }
+ if (pluginContext.isComplete()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(msg+ " . Manager complete. Not scheduling event. Vertex=" + managedVertex.getLogIdentifier());
+ }
+ return false;
+ }
+ return true;
+ }
+
+ private void enqueueAndScheduleNextEvent(VertexManagerEvent e) {
+ if (!pluginInvocationAllowed("Dropping event")) {
+ return;
+ }
+ eventQueue.add(e);
+ tryScheduleNextEvent();
+ }
+
+ private void tryScheduleNextEvent() {
+ if (!pluginInvocationAllowed("Not scheduling")) {
+ return;
+ }
+ if (eventQueue.isEmpty()) {
+ return;
+ }
+ if (eventInFlight.compareAndSet(false, true)) {
+ // no event was in flight
+ VertexManagerEvent e = eventQueue.poll();
+ Preconditions.checkState(e != null);
+ ListenableFuture<Void> future = execService.submit(e);
+ Futures.addCallback(future, e.getCallback());
+ }
+ }
public void onVertexStarted(List<TezTaskAttemptID> completions) throws AMUserCodeException {
Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
@@ -360,53 +412,180 @@ public class VertexManager {
taskIdList.add(taskId);
}
}
- try {
- if (!pluginContext.isComplete()) {
- plugin.onVertexStarted(pluginCompletionsMap);
- }
- } catch (Exception e) {
- throw new AMUserCodeException(Source.VertexManager, e);
- }
+ enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(pluginCompletionsMap));
}
public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException {
Integer taskId = Integer.valueOf(tezTaskId.getId());
String vertexName =
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
- try {
- if (!pluginContext.isComplete()) {
- plugin.onSourceTaskCompleted(vertexName, taskId);
- }
- } catch (Exception e) {
- throw new AMUserCodeException(Source.VertexManager, e);
+ enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(taskId, vertexName));
+ }
+
+ public void onVertexManagerEventReceived(
+ org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) throws AMUserCodeException {
+ enqueueAndScheduleNextEvent(new VertexManagerEventReceived(vmEvent));
+ }
+
+ public void onRootVertexInitialized(String inputName,
+ InputDescriptor inputDescriptor, List<Event> events) throws AMUserCodeException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; enqueueing onRootVertexInitialized"
+ + " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size());
}
+ enqueueAndScheduleNextEvent(new VertexManagerEventRootInputInitialized(inputName,
+ inputDescriptor, events));
}
- public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws AMUserCodeException {
- try {
- if (!pluginContext.isComplete()) {
- plugin.onVertexManagerEventReceived(vmEvent);
+ private class VertexManagerCallback implements FutureCallback<Void> {
+
+ @Override
+ public void onFailure(Throwable t) {
+ // stop further event processing
+ pluginFailed.set(true);
+ eventQueue.clear();
+ // catch real root cause of failure, it would throw UndeclaredThrowableException
+ // if using UGI.doAs
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
}
- } catch (Exception e) {
- throw new AMUserCodeException(Source.VertexManager, e);
+ Preconditions.checkState(appContext != null);
+ Preconditions.checkState(managedVertex != null);
+ // state change must be triggered via an event transition
+ appContext.getEventHandler().handle(
+ new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
+ new AMUserCodeException(Source.VertexManager, t)));
+ // enqueue no further events due to user code error
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ Preconditions.checkState(eventInFlight.get());
+ eventInFlight.set(false);
+ tryScheduleNextEvent();
}
}
+
+ private class VertexManagerRootInputInitializedCallback extends VertexManagerCallback {
- public List<TezEvent> onRootVertexInitialized(String inputName,
- InputDescriptor inputDescriptor, List<Event> events) throws AMUserCodeException {
- try {
- if (!pluginContext.isComplete()) {
- plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+ @Override
+ public void onSuccess(Void result) {
+ super.onSuccess(result);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("vertex:" + managedVertex.getLogIdentifier()
+ + "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:"
+ + ", current task events size is " + rootInputInitEventQueue.size());
}
- } catch (Exception e) {
- throw new AMUserCodeException(Source.VertexManager, e);
+ List<TezEvent> resultEvents = new ArrayList<TezEvent>();
+ rootInputInitEventQueue.drainTo(resultEvents);
+ appContext.getEventHandler().handle(
+ new VertexEventInputDataInformation(managedVertex.getVertexId(), resultEvents));
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; after call of VertexManagerPlugin.onRootVertexInitialized"
- + " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size());
+ }
+
+ class VertexManagerEventOnVertexStateUpdate extends VertexManagerEvent {
+ private final VertexStateUpdate event;
+
+ public VertexManagerEventOnVertexStateUpdate(VertexStateUpdate event) {
+ this.event = event;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ plugin.onVertexStateUpdated(event);
+ }
+
+ }
+
+ class VertexManagerEventOnVertexStarted extends VertexManagerEvent {
+ private final Map<String, List<Integer>> pluginCompletionsMap;
+
+ public VertexManagerEventOnVertexStarted(Map<String, List<Integer>> pluginCompletionsMap) {
+ this.pluginCompletionsMap = pluginCompletionsMap;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ plugin.onVertexStarted(pluginCompletionsMap);
+ }
+
+ }
+
+ class VertexManagerEventSourceTaskCompleted extends VertexManagerEvent {
+ private final Integer taskId;
+ private final String vertexName;
+
+ public VertexManagerEventSourceTaskCompleted(Integer taskId, String vertexName) {
+ this.taskId = taskId;
+ this.vertexName = vertexName;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ plugin.onSourceTaskCompleted(vertexName, taskId);
}
- List<TezEvent> resultEvents = new ArrayList<TezEvent>();
- rootInputInitEventQueue.drainTo(resultEvents);
- return resultEvents;
+
+ }
+
+ class VertexManagerEventReceived extends VertexManagerEvent {
+ private final org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent;
+
+ public VertexManagerEventReceived(org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) {
+ this.vmEvent = vmEvent;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ plugin.onVertexManagerEventReceived(vmEvent);
+ }
+
+ }
+
+ class VertexManagerEventRootInputInitialized extends VertexManagerEvent {
+ private final String inputName;
+ private final InputDescriptor inputDescriptor;
+ private final List<Event> events;
+
+ public VertexManagerEventRootInputInitialized(String inputName,
+ InputDescriptor inputDescriptor, List<Event> events) {
+ super(new VertexManagerRootInputInitializedCallback());
+ this.inputName = inputName;
+ this.inputDescriptor = inputDescriptor;
+ this.events = events;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+ }
+
+ }
+
+ abstract class VertexManagerEvent extends CallableEvent {
+ public VertexManagerEvent() {
+ this(VM_CALLBACK);
+ }
+ public VertexManagerEvent(VertexManagerCallback callback) {
+ super(callback);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ final VertexManager manager = VertexManager.this;
+ manager.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ synchronized (manager) {
+ if (manager.pluginInvocationAllowed("Not invoking")) {
+ invoke();
+ }
+ }
+ return null;
+ }
+ });
+ return null;
+ }
+
+ public abstract void invoke() throws Exception;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java
new file mode 100644
index 0000000..a81bd68
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+
+public class CallableEventDispatcher implements EventHandler<CallableEvent> {
+
+ @Override
+ public void handle(CallableEvent event) {
+ try {
+ event.call();
+ event.getCallback().onSuccess(null);
+ } catch (Exception e) {
+ event.getCallback().onFailure(e);
+ }
+ }
+
+ }
+
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index cae9059..599f01e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.dag.impl;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -26,9 +27,10 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -58,7 +61,6 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
@@ -85,6 +87,8 @@ import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -125,8 +129,13 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.protobuf.ByteString;
public class TestDAGImpl {
@@ -136,6 +145,7 @@ public class TestDAGImpl {
private TezDAGID dagId;
private static Configuration conf;
private DrainDispatcher dispatcher;
+ private ListeningExecutorService execService;
private Credentials fsTokens;
private AppContext appContext;
private ACLManager aclManager;
@@ -724,6 +734,7 @@ public class TestDAGImpl {
MockDNSToSwitchMapping.initializeMockRackResolver();
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Before
public void setup() {
conf = new Configuration();
@@ -736,6 +747,19 @@ public class TestDAGImpl {
dispatcher = new DrainDispatcher();
fsTokens = new Credentials();
appContext = mock(AppContext.class);
+ execService = mock(ListeningExecutorService.class);
+ final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+
+ Mockito.doAnswer(new Answer() {
+ public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ CallableEvent e = (CallableEvent) args[0];
+ dispatcher.getEventHandler().handle(e);
+ return mockFuture;
+ }})
+ .when(execService).submit((Callable<Void>) any());
+
+ doReturn(execService).when(appContext).getExecService();
historyEventHandler = mock(HistoryEventHandler.class);
aclManager = new ACLManager("amUser");
doReturn(conf).when(appContext).getAMConf();
@@ -750,6 +774,7 @@ public class TestDAGImpl {
doReturn(dag).when(appContext).getCurrentDAG();
mrrAppContext = mock(AppContext.class);
doReturn(aclManager).when(mrrAppContext).getAMACLManager();
+ doReturn(execService).when(mrrAppContext).getExecService();
mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
mrrDagPlan = createTestMRRDAGPlan();
mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
@@ -763,6 +788,7 @@ public class TestDAGImpl {
doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler();
groupAppContext = mock(AppContext.class);
doReturn(aclManager).when(groupAppContext).getAMACLManager();
+ doReturn(execService).when(groupAppContext).getExecService();
groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
groupDagPlan = createGroupDAGPlan();
groupDag = new DAGImpl(groupDagId, conf, groupDagPlan,
@@ -778,6 +804,7 @@ public class TestDAGImpl {
// reset totalCommitCounter to 0
TotalCountingOutputCommitter.totalCommitCounter = 0;
+ dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
taskEventDispatcher = new TaskEventDispatcher();
dispatcher.register(TaskEventType.class, taskEventDispatcher);
taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
@@ -797,6 +824,7 @@ public class TestDAGImpl {
public void teardown() {
dispatcher.await();
dispatcher.stop();
+ execService.shutdownNow();
dagPlan = null;
dag = null;
}
@@ -817,6 +845,7 @@ public class TestDAGImpl {
dispatcher.getEventHandler(), taskAttemptListener,
fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);
doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf();
+ doReturn(execService).when(dagWithCustomEdgeAppContext).getExecService();
doReturn(dagWithCustomEdge).when(dagWithCustomEdgeAppContext).getCurrentDAG();
doReturn(appAttemptId).when(dagWithCustomEdgeAppContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(dagWithCustomEdgeAppContext).getApplicationID();
@@ -838,7 +867,7 @@ public class TestDAGImpl {
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, impl.getState());
}
-
+
@Test(timeout = 5000)
public void testDAGInit() {
initDAG(dag);
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 83a3a8a..2c6f5e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -116,6 +117,8 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
@@ -176,10 +179,13 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -187,6 +193,7 @@ import org.mockito.stubbing.Answer;
public class TestVertexImpl {
private static final Log LOG = LogFactory.getLog(TestVertexImpl.class);
+ private ListeningExecutorService execService;
private boolean useCustomInitializer = false;
private InputInitializer customInitializer = null;
@@ -2119,6 +2126,7 @@ public class TestVertexImpl {
anyInt());
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public void setupPostDagCreation() throws AMUserCodeException {
String dagName = "dag0";
dispatcher = new DrainDispatcher();
@@ -2138,6 +2146,19 @@ public class TestVertexImpl {
doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(appContext).getApplicationID();
doReturn(dag).when(appContext).getCurrentDAG();
+ execService = mock(ListeningExecutorService.class);
+ final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+
+ Mockito.doAnswer(new Answer() {
+ public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ CallableEvent e = (CallableEvent) args[0];
+ dispatcher.getEventHandler().handle(e);
+ return mockFuture;
+ }})
+ .when(execService).submit((Callable<Void>) any());
+
+ doReturn(execService).when(appContext).getExecService();
doReturn(conf).when(appContext).getAMConf();
doReturn(new Credentials()).when(dag).getCredentials();
doReturn(DAGPlan.getDefaultInstance()).when(dag).getJobPlan();
@@ -2191,6 +2212,7 @@ public class TestVertexImpl {
edge.initialize();
}
+ dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
taskEventDispatcher = new TaskEventDispatcher();
@@ -2224,6 +2246,7 @@ public class TestVertexImpl {
dispatcher.await();
dispatcher.stop();
}
+ execService.shutdownNow();
dispatcher = null;
vertexEventDispatcher = null;
dagEventDispatcher = null;
@@ -2247,7 +2270,6 @@ public class TestVertexImpl {
}
}
-
@SuppressWarnings("unchecked")
private void initVertex(VertexImpl v) {
Assert.assertEquals(VertexState.NEW, v.getState());
@@ -2382,6 +2404,7 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.INITED, v2.getState());
Assert.assertEquals(0, listener.events.size()); // configured event not sent
startVertex(v1, true);
+ dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v2.getState());
Assert.assertEquals(1, listener.events.size()); // configured event sent after VM
Assert.assertEquals("vertex2", listener.events.get(0).getVertexName());
@@ -2821,12 +2844,13 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testVertexTaskAttemptProcessorFailure() {
+ public void testVertexTaskAttemptProcessorFailure() throws Exception {
initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex1");
startVertex(v);
+ dispatcher.await();
TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
@@ -2856,12 +2880,13 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testVertexTaskAttemptInputFailure() {
+ public void testVertexTaskAttemptInputFailure() throws Exception {
initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex1");
startVertex(v);
+ dispatcher.await();
TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
@@ -2892,12 +2917,13 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testVertexTaskAttemptOutputFailure() {
+ public void testVertexTaskAttemptOutputFailure() throws Exception {
initAllVertices(VertexState.INITED);
VertexImpl v = vertices.get("vertex1");
startVertex(v);
+ dispatcher.await();
TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
@@ -3355,7 +3381,7 @@ public class TestVertexImpl {
@Test(timeout = 5000)
- public void testVertexWithOneToOneSplit() throws AMUserCodeException {
+ public void testVertexWithOneToOneSplit() throws Exception {
// create a diamond shaped dag with 1-1 edges.
// split the source and remaining vertices should split equally
// vertex with 2 incoming splits from the same source should split once
@@ -3386,7 +3412,7 @@ public class TestVertexImpl {
RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
initializerManager1.completeInputInitialization(0, numTasks, v1Hints);
-
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
Assert.assertEquals(numTasks, v1.getTotalTasks());
Assert.assertEquals(RootInputVertexManager.class.getName(), v1
@@ -3437,10 +3463,10 @@ public class TestVertexImpl {
// fudge vertex manager so that tasks dont start running
v1.vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
- v1, appContext, mock(StateChangeNotifier.class));
+ UserGroupInformation.getCurrentUser(), v1, appContext, mock(StateChangeNotifier.class));
v1.vertexManager.initialize();
startVertex(v1);
-
+ dispatcher.await();
Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
@@ -3476,7 +3502,7 @@ public class TestVertexImpl {
// fudge vertex manager so that tasks dont start running
v1.vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
- v1, appContext, mock(StateChangeNotifier.class));
+ UserGroupInformation.getCurrentUser(), v1, appContext, mock(StateChangeNotifier.class));
v1.vertexManager.initialize();
Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
@@ -3519,7 +3545,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testVertexWithInitializerFailure() throws AMUserCodeException {
+ public void testVertexWithInitializerFailure() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -3548,7 +3574,7 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
initializerManager2.failInputInitialization();
-
+ dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
Assert.assertEquals(RootInputVertexManager.class.getName(), v2
.getVertexManager().getPlugin().getClass().getName());
@@ -4400,7 +4426,7 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testVertexWithMultipleInitializers1() throws AMUserCodeException {
+ public void testVertexWithMultipleInitializers1() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
@@ -4419,15 +4445,17 @@ public class TestVertexImpl {
// Complete initializer which sets parallelism first
initializerManager1.completeInputInitialization(0, 5, v1Hints);
+ dispatcher.await();
Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
// Complete second initializer
initializerManager1.completeInputInitialization(1);
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
}
@Test(timeout = 5000)
- public void testVertexWithMultipleInitializers2() throws AMUserCodeException {
+ public void testVertexWithMultipleInitializers2() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
@@ -4446,16 +4474,18 @@ public class TestVertexImpl {
// Complete initializer which does not set parallelism
initializerManager1.completeInputInitialization(1);
+ dispatcher.await();
Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
// Complete second initializer which sets parallelism
initializerManager1.completeInputInitialization(0, 5, v1Hints);
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
}
@SuppressWarnings("unchecked")
@Test(timeout = 500000)
- public void testVertexWithInitializerSuccess() throws AMUserCodeException {
+ public void testVertexWithInitializerSuccess() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4470,7 +4500,7 @@ public class TestVertexImpl {
RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
initializerManager1.completeInputInitialization(0, 5, v1Hints);
-
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
Assert.assertEquals(5, v1.getTotalTasks());
// task events get buffered
@@ -4510,7 +4540,7 @@ public class TestVertexImpl {
RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
initializerManager2.completeInputInitialization(0, 10, v2Hints);
-
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v2.getState());
Assert.assertEquals(10, v2.getTotalTasks());
// task events get buffered
@@ -4530,7 +4560,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testVertexWithInputDistributor() throws AMUserCodeException {
+ public void testVertexWithInputDistributor() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer");
@@ -4547,6 +4577,7 @@ public class TestVertexImpl {
RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
byte[] payload = new byte[0];
initializerManager1.completeInputDistribution(payload);
+ dispatcher.await();
// edge is still null so its initializing
Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
Assert.assertEquals(true, initializerManager1.hasShutDown);
@@ -4565,7 +4596,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testVertexRootInputSpecUpdateAll() throws AMUserCodeException {
+ public void testVertexRootInputSpecUpdateAll() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4580,7 +4611,7 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
RootInputInitializerManagerControlled initializerManager1 = v3.getRootInputInitializerManager();
initializerManager1.completeInputInitialization();
-
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v3.getState());
Assert.assertEquals(expectedNumTasks, v3.getTotalTasks());
Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v3.getVertexManager()
@@ -4596,7 +4627,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testVertexRootInputSpecUpdatePerTask() throws AMUserCodeException {
+ public void testVertexRootInputSpecUpdatePerTask() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
@@ -4611,7 +4642,7 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.INITIALIZING, v4.getState());
RootInputInitializerManagerControlled initializerManager1 = v4.getRootInputInitializerManager();
initializerManager1.completeInputInitialization();
-
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v4.getState());
Assert.assertEquals(expectedNumTasks, v4.getTotalTasks());
Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v4.getVertexManager()
@@ -4969,7 +5000,7 @@ public class TestVertexImpl {
// fudge the vm so we can do custom stuff
vB.vertexManager = new VertexManager(
VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
- vB, appContext, mock(StateChangeNotifier.class));
+ UserGroupInformation.getCurrentUser(), vB, appContext, mock(StateChangeNotifier.class));
vB.vertexReconfigurationPlanned();
@@ -5093,7 +5124,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromVM_OnRootVertexInitialized() throws AMUserCodeException {
+ public void testExceptionFromVM_OnRootVertexInitialized() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnRootVertexInitialized);
@@ -5118,7 +5149,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromVM_OnVertexStarted() throws AMUserCodeException {
+ public void testExceptionFromVM_OnVertexStarted() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexStarted);
@@ -5136,7 +5167,7 @@ public class TestVertexImpl {
dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
VertexEventType.V_START));
dispatcher.await();
-
+
Assert.assertEquals(VertexManagerWithException.class, v1.vertexManager.getPlugin().getClass());
Assert.assertEquals(VertexState.FAILED, v1.getState());
String diagnostics = StringUtils.join(v1.getDiagnostics(), ",");
@@ -5146,7 +5177,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromVM_OnSourceTaskCompleted() throws AMUserCodeException {
+ public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnSourceTaskCompleted);
@@ -5183,7 +5214,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromVM_OnVertexManagerEventReceived() throws AMUserCodeException {
+ public void testExceptionFromVM_OnVertexManagerEventReceived() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexManagerEventReceived);
@@ -5210,7 +5241,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws AMUserCodeException {
+ public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithVMException("TestVMStateUpdate", VMExceptionLocation.OnVertexManagerVertexStateUpdated);
@@ -5228,7 +5259,7 @@ public class TestVertexImpl {
dispatcher.await();
Assert.assertEquals(VertexState.INITED, v2.getState());
startVertex(v1, false);
-
+ dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
assertTrue(diagnostics.contains(VMExceptionLocation.OnVertexManagerVertexStateUpdated.name()));
@@ -5261,7 +5292,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromII_InitFailedAfterInitialized() throws AMUserCodeException {
+ public void testExceptionFromII_InitFailedAfterInitialized() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithIIException();
@@ -5272,6 +5303,7 @@ public class TestVertexImpl {
initVertex(v1);
RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
initializerManager1.completeInputInitialization(0);
+ dispatcher.await();
Assert.assertEquals(VertexState.INITED, v1.getState());
String errorMsg = "ErrorWhenInitFailureAtInited";
dispatcher.getEventHandler().handle(new VertexEventRootInputFailed(v1.getVertexId(), "input1",
@@ -5285,7 +5317,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromII_InitFailedAfterRunning() throws AMUserCodeException {
+ public void testExceptionFromII_InitFailedAfterRunning() throws Exception {
useCustomInitializer = true;
setupPreDagCreation();
dagPlan = createDAGPlanWithIIException();
@@ -5296,6 +5328,7 @@ public class TestVertexImpl {
initVertex(v1);
RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
initializerManager1.completeInputInitialization(0);
+ dispatcher.await();
startVertex(v1);
Assert.assertEquals(VertexState.RUNNING, v1.getState());
String errorMsg = "ErrorWhenInitFailureAtRunning";
@@ -5310,7 +5343,7 @@ public class TestVertexImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
- public void testExceptionFromII_HandleInputInitializerEvent() throws AMUserCodeException, InterruptedException {
+ public void testExceptionFromII_HandleInputInitializerEvent() throws Exception {
useCustomInitializer = true;
customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent);
EventHandlingRootInputInitializer initializer =
@@ -5351,7 +5384,7 @@ public class TestVertexImpl {
dispatcher.getEventHandler()
.handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
dispatcher.await();
-
+
// it would cause v2 fail as its II throw exception in handleInputInitializerEvent
String diagnostics = StringUtils.join(v2.getDiagnostics(), ",");
assertTrue(diagnostics.contains(IIExceptionLocation.HandleInputInitializerEvent.name()));
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index 73dc5eb..81cb42a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -23,6 +23,8 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashMap;
@@ -31,7 +33,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -39,38 +44,73 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class TestVertexManager {
-
- @Test(timeout = 5000)
- public void testOnRootVertexInitialized() throws Exception {
- Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
- AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ AppContext mockAppContext;
+ ListeningExecutorService execService;
+ Vertex mockVertex;
+ EventHandler mockHandler;
+ ArgumentCaptor<VertexEventInputDataInformation> requestCaptor;
+
+ @Before
+ public void setup() {
+ mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ execService = mock(ListeningExecutorService.class);
+ final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+ Mockito.doAnswer(new Answer() {
+ public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ CallableEvent e = (CallableEvent) args[0];
+ new CallableEventDispatcher().handle(e);
+ return mockFuture;
+ }})
+ .when(execService).submit((Callable<Void>) any());
+ doReturn(execService).when(mockAppContext).getExecService();
+ mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
doReturn("vertex1").when(mockVertex).getName();
+ mockHandler = mock(EventHandler.class);
+ when(mockAppContext.getEventHandler()).thenReturn(mockHandler);
when(
mockAppContext.getCurrentDAG().getVertex(any(String.class))
.getTotalTasks()).thenReturn(1);
+ requestCaptor = ArgumentCaptor.forClass(VertexEventInputDataInformation.class);
+ }
+
+ @Test(timeout = 5000)
+ public void testOnRootVertexInitialized() throws Exception {
VertexManager vm =
new VertexManager(
VertexManagerPluginDescriptor.create(RootInputVertexManager.class
- .getName()), mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+ .getName()), UserGroupInformation.getCurrentUser(),
+ mockVertex, mockAppContext, mock(StateChangeNotifier.class));
vm.initialize();
InputDescriptor id1 = mock(InputDescriptor.class);
List<Event> events1 = new LinkedList<Event>();
InputDataInformationEvent diEvent1 =
InputDataInformationEvent.createWithSerializedPayload(0, null);
events1.add(diEvent1);
- List<TezEvent> tezEvents1 =
- vm.onRootVertexInitialized("input1", id1, events1);
+ vm.onRootVertexInitialized("input1", id1, events1);
+ verify(mockHandler, times(1)).handle(requestCaptor.capture());
+ List<TezEvent> tezEvents1 = requestCaptor.getValue().getEvents();
assertEquals(1, tezEvents1.size());
assertEquals(diEvent1, tezEvents1.get(0).getEvent());
@@ -79,8 +119,9 @@ public class TestVertexManager {
InputDataInformationEvent diEvent2 =
InputDataInformationEvent.createWithSerializedPayload(0, null);
events2.add(diEvent2);
- List<TezEvent> tezEvents2 =
- vm.onRootVertexInitialized("input1", id2, events2);
+ vm.onRootVertexInitialized("input1", id2, events2);
+ verify(mockHandler, times(2)).handle(requestCaptor.capture());
+ List<TezEvent> tezEvents2 = requestCaptor.getValue().getEvents();
assertEquals(tezEvents2.size(), 1);
assertEquals(diEvent2, tezEvents2.get(0).getEvent());
}
@@ -92,17 +133,11 @@ public class TestVertexManager {
*/
@Test(timeout = 5000)
public void testOnRootVertexInitialized2() throws Exception {
- Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
- AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
- doReturn("vertex1").when(mockVertex).getName();
- when(
- mockAppContext.getCurrentDAG().getVertex(any(String.class))
- .getTotalTasks()).thenReturn(1);
-
VertexManager vm =
new VertexManager(
VertexManagerPluginDescriptor.create(CustomVertexManager.class
- .getName()), mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+ .getName()), UserGroupInformation.getCurrentUser(),
+ mockVertex, mockAppContext, mock(StateChangeNotifier.class));
vm.initialize();
InputDescriptor id1 = mock(InputDescriptor.class);
List<Event> events1 = new LinkedList<Event>();
@@ -111,17 +146,20 @@ public class TestVertexManager {
events1.add(diEvent1);
// do not call context.addRootInputEvents, just cache the TezEvent
- List<TezEvent> tezEventsAfterInput1 = vm.onRootVertexInitialized("input1", id1, events1);
+ vm.onRootVertexInitialized("input1", id1, events1);
+ verify(mockHandler, times(1)).handle(requestCaptor.capture());
+ List<TezEvent> tezEventsAfterInput1 = requestCaptor.getValue().getEvents();
assertEquals(0, tezEventsAfterInput1.size());
-
+
InputDescriptor id2 = mock(InputDescriptor.class);
List<Event> events2 = new LinkedList<Event>();
InputDataInformationEvent diEvent2 =
InputDataInformationEvent.createWithSerializedPayload(0, null);
events2.add(diEvent2);
// call context.addRootInputEvents(input1), context.addRootInputEvents(input2)
- List<TezEvent> tezEventsAfterInput2 =
- vm.onRootVertexInitialized("input2", id2, events2);
+ vm.onRootVertexInitialized("input2", id2, events2);
+ verify(mockHandler, times(2)).handle(requestCaptor.capture());
+ List<TezEvent> tezEventsAfterInput2 = requestCaptor.getValue().getEvents();
assertEquals(2, tezEventsAfterInput2.size());
// also verify the EventMetaData
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index e2e9dd3..bbe9dcf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -170,7 +170,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
numConfiguredSources++;
int target = getContext().getInputVertexEdgeProperties().size();
- LOG.info("For vertex: " + getContext().getVertexName() + "Received configured signal from: "
+ LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: "
+ stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
+ " needed: " + target);
Preconditions.checkState(numConfiguredSources <= target, "Vertex: " + getContext().getVertexName());
http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 8de747d..411ea71 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -243,12 +243,6 @@ public class TestInputReadyVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(3);
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
- when(mockContext.getTaskContainer(mockSrcVertexId2, 0)).thenReturn(mockContainer2);
- when(mockContext.getTaskContainer(mockSrcVertexId2, 1)).thenReturn(mockContainer2);
- when(mockContext.getTaskContainer(mockSrcVertexId2, 2)).thenReturn(mockContainer2);
- when(mockContext.getTaskContainer(mockSrcVertexId3, 0)).thenReturn(mockContainer3);
- when(mockContext.getTaskContainer(mockSrcVertexId3, 1)).thenReturn(mockContainer3);
- when(mockContext.getTaskContainer(mockSrcVertexId3, 2)).thenReturn(mockContainer3);
mockInputVertices.put(mockSrcVertexId1, eProp1);
mockInputVertices.put(mockSrcVertexId2, eProp2);
mockInputVertices.put(mockSrcVertexId3, eProp3);