You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/03 00:21:32 UTC
tez git commit: TEZ-1897. Create a concurrent version of
AsyncDispatcher (bikas)
Repository: tez
Updated Branches:
refs/heads/master 9f090279d -> f6ea0fb33
TEZ-1897. Create a concurrent version of AsyncDispatcher (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f6ea0fb3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f6ea0fb3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f6ea0fb3
Branch: refs/heads/master
Commit: f6ea0fb3306faa709c445e4d76081de60545d760
Parents: 9f09027
Author: Bikas Saha <bi...@apache.org>
Authored: Sat May 2 15:21:17 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat May 2 15:21:17 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 14 +
tez-common/findbugs-exclude.xml | 5 +
.../org/apache/tez/common/AsyncDispatcher.java | 85 ++++-
.../tez/common/AsyncDispatcherConcurrent.java | 368 +++++++++++++++++++
.../org/apache/tez/common/TezAbstractEvent.java | 45 +++
.../org/apache/tez/dag/records/TezTaskID.java | 20 +-
.../apache/tez/common/TestAsyncDispatcher.java | 2 +-
.../common/TestAsyncDispatcherConcurrent.java | 194 ++++++++++
.../org/apache/tez/dag/app/DAGAppMaster.java | 19 +-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 2 -
.../tez/dag/app/dag/event/CallableEvent.java | 4 +-
.../dag/app/dag/event/DAGAppMasterEvent.java | 5 +-
.../apache/tez/dag/app/dag/event/DAGEvent.java | 4 +-
.../tez/dag/app/dag/event/SpeculatorEvent.java | 4 +-
.../tez/dag/app/dag/event/TaskAttemptEvent.java | 9 +-
.../apache/tez/dag/app/dag/event/TaskEvent.java | 9 +-
.../tez/dag/app/dag/event/VertexEvent.java | 4 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 1 -
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 29 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 10 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 13 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 50 ++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 42 ++-
.../app/dag/impl/TestTaskAttemptRecovery.java | 2 +-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 4 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 2 +-
27 files changed, 857 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 609db3c..8108ac8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
ALL CHANGES:
+ TEZ-1897. Create a concurrent version of AsyncDispatcher
TEZ-2394. Issues when there is an error in VertexManager callbacks
TEZ-2386. Tez UI: Inconsistent usage of icon colors
TEZ-2395. Tez UI: Minimum/Maximum Duration show a empty bracket next to 0 secs when you purposefully failed a job.
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 14e773d..a301957 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -141,6 +141,20 @@ public class TezConfiguration extends Configuration {
@ConfigurationScope(Scope.AM)
public static final String TEZ_CREDENTIALS_PATH = TEZ_PREFIX + "credentials.path";
+ @Private
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_USE_CONCURRENT_DISPATCHER = TEZ_AM_PREFIX
+ + "use.concurrent-dispatcher";
+ @Private
+ public static boolean TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT = true;
+
+ @Private
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY = TEZ_AM_PREFIX
+ + "concurrent-dispatcher.concurrency";
+ @Private
+ public static final int TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT = 10;
+
/**
* Boolean value. Execution mode for the Tez application. True implies session mode. If the client
* code is written according to best practices then the same code can execute in either mode based
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-common/findbugs-exclude.xml b/tez-common/findbugs-exclude.xml
index 7814585..6f6253d 100644
--- a/tez-common/findbugs-exclude.xml
+++ b/tez-common/findbugs-exclude.xml
@@ -20,4 +20,9 @@
<Bug pattern="DM_EXIT"/>
</Match>
+ <Match>
+ <Class name="org.apache.tez.common.AsyncDispatcherConcurrent$1"/>
+ <Method name="run" />
+ <Bug pattern="DM_EXIT"/>
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 5aaa4cf..4319f4f 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -68,8 +68,11 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
private EventHandler handlerInstance = new GenericEventHandler();
private Thread eventHandlingThread;
- protected final Map<Class<? extends Enum>, EventHandler> eventHandlers;
- protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers;
+ protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap();
+ protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers = Maps.newHashMap();
+ protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> concurrentEventDispatchers =
+ Maps.newHashMap();
+
private boolean exitOnDispatchException;
public AsyncDispatcher(String name) {
@@ -77,11 +80,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
}
public AsyncDispatcher(String name, BlockingQueue<Event> eventQueue) {
- super("Dispatcher");
+ super(name);
this.name = name;
this.eventQueue = eventQueue;
- this.eventHandlers = Maps.newHashMap();
- this.eventDispatchers = Maps.newHashMap();
}
public Runnable createThread() {
@@ -195,6 +196,32 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
}
}
}
+
+ private void checkForExistingHandler(Class<? extends Enum> eventType) {
+ EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+ Preconditions.checkState(registeredHandler == null,
+ "Cannot register same event on multiple dispatchers");
+ }
+
+ private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
+ AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
+ Preconditions.checkState(registeredDispatcher == null,
+ "Multiple dispatchers cannot be registered for: " + eventType.getName());
+ }
+
+ private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) {
+ AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType);
+ Preconditions.checkState(concurrentDispatcher == null,
+ "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
+ }
+
+ private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
+ if (checkHandler) {
+ checkForExistingHandler(eventType);
+ }
+ checkForExistingDispatcher(eventType);
+ checkForExistingConcurrentDispatcher(eventType);
+ }
/**
* Add an EventHandler for events handled inline on this dispatcher
@@ -205,9 +232,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
Preconditions.checkState(getServiceState() == STATE.NOTINITED);
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
- AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
- Preconditions.checkState(registeredDispatcher == null,
- "Cannot register same event on multiple dispatchers");
+ checkForExistingDispatchers(false, eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventHandlers.put(eventType, handler);
@@ -231,20 +256,41 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
public void registerAndCreateDispatcher(Class<? extends Enum> eventType,
EventHandler handler, String dispatcherName) {
Preconditions.checkState(getServiceState() == STATE.NOTINITED);
- AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
- dispatcher.register(eventType, handler);
/* check to see if we have a listener registered */
- AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
- EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
- Preconditions.checkState(registeredHandler == null,
- "Cannot register same event on multiple dispatchers");
+ checkForExistingDispatchers(true, eventType);
LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
- Preconditions.checkState(registeredDispatcher == null,
- "Multiple dispatchers cannot be registered for: " + eventType.getName());
+ AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
+ dispatcher.register(eventType, handler);
eventDispatchers.put(eventType, dispatcher);
addIfService(dispatcher);
}
+
+ public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> eventType,
+ EventHandler handler, String dispatcherName, int numThreads) {
+ Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+
+ /* check to see if we have a listener registered */
+ checkForExistingDispatchers(true, eventType);
+ LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
+ AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
+ dispatcher.register(eventType, handler);
+ concurrentEventDispatchers.put(eventType, dispatcher);
+ addIfService(dispatcher);
+ return dispatcher;
+ }
+
+ public void registerWithExistingDispatcher(Class<? extends Enum> eventType,
+ EventHandler handler, AsyncDispatcherConcurrent dispatcher) {
+ Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+
+ /* check to see if we have a listener registered */
+ checkForExistingDispatchers(true, eventType);
+ LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
+ + handler.getClass());
+ dispatcher.register(eventType, handler);
+ concurrentEventDispatchers.put(eventType, dispatcher);
+ }
@Override
public EventHandler getEventHandler() {
@@ -261,13 +307,18 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
}
drained = false;
- // offload to specific dispatcher is one exists
+ // offload to specific dispatcher if one exists
Class<? extends Enum> type = event.getType().getDeclaringClass();
AsyncDispatcher registeredDispatcher = eventDispatchers.get(type);
if (registeredDispatcher != null) {
registeredDispatcher.getEventHandler().handle(event);
return;
}
+ AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(type);
+ if (concurrentDispatcher != null) {
+ concurrentDispatcher.getEventHandler().handle(event);
+ return;
+ }
// no registered dispatcher. use internal dispatcher.
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
new file mode 100644
index 0000000..d19bf9e
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A dispatcher that can schedule events concurrently. Uses a fixed size threadpool
+ * to schedule events. Events that have the same serializing hash will get scheduled
+ * on the same thread in the threadpool. This can be used to prevent concurrency issues
+ * for events that may not be independently processed.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@Private
+public class AsyncDispatcherConcurrent extends CompositeService implements Dispatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class);
+
+ private final String name;
+ private final ArrayList<LinkedBlockingQueue<Event>> eventQueues;
+ private volatile boolean stopped = false;
+
+ // Configuration flag for enabling/disabling draining dispatcher's events on
+ // stop functionality.
+ private volatile boolean drainEventsOnStop = false;
+
+ // Indicates all the remaining dispatcher's events on stop have been drained
+ // and processed.
+ private volatile boolean drained = true;
+ private Object waitForDrained = new Object();
+
+ // For drainEventsOnStop enabled only, block newly coming events into the
+ // queue while stopping.
+ private volatile boolean blockNewEvents = false;
+ private EventHandler handlerInstance = new GenericEventHandler();
+
+ private ExecutorService execService;
+ private final int numThreads;
+
+ protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap();
+ protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers =
+ Maps.newHashMap();
+ private boolean exitOnDispatchException;
+
+ AsyncDispatcherConcurrent(String name, int numThreads) {
+ super(name);
+ Preconditions.checkArgument(numThreads > 0);
+ this.name = name;
+ this.eventQueues = Lists.newArrayListWithCapacity(numThreads);
+ this.numThreads = numThreads;
+ }
+
+ class DispatchRunner implements Runnable {
+ final LinkedBlockingQueue<Event> queue;
+
+ public DispatchRunner(LinkedBlockingQueue<Event> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ drained = queue.isEmpty();
+ // blockNewEvents is only set when dispatcher is draining to stop,
+ // adding this check is to avoid the overhead of acquiring the lock
+ // and calling notify every time in the normal run of the loop.
+ if (blockNewEvents) {
+ synchronized (waitForDrained) {
+ if (drained) {
+ waitForDrained.notify();
+ }
+ }
+ }
+ Event event;
+ try {
+ event = queue.take();
+ } catch(InterruptedException ie) {
+ if (!stopped) {
+ LOG.warn("AsyncDispatcher thread interrupted", ie);
+ }
+ return;
+ }
+ if (event != null) {
+ dispatch(event);
+ }
+ }
+ }
+ };
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ // TODO TEZ-2049 remove YARN reference
+ this.exitOnDispatchException =
+ conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+ Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Dispatcher [" + this.name + "] #%d").build());
+ for (int i=0; i<numThreads; ++i) {
+ eventQueues.add(new LinkedBlockingQueue<Event>());
+ }
+ for (int i=0; i<numThreads; ++i) {
+ execService.execute(new DispatchRunner(eventQueues.get(i)));
+ }
+ //start all the components
+ super.serviceStart();
+ }
+
+ public void setDrainEventsOnStop() {
+ drainEventsOnStop = true;
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (execService != null) {
+ if (drainEventsOnStop) {
+ blockNewEvents = true;
+ LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
+ synchronized (waitForDrained) {
+ while (!drained && !execService.isShutdown()) {
+ LOG.info("Waiting for AsyncDispatcher to drain.");
+ waitForDrained.wait(1000);
+ }
+ }
+ }
+
+ stopped = true;
+
+ for (int i=0; i<numThreads; ++i) {
+ LOG.info("AsyncDispatcher stopping with events: " + eventQueues.get(i).size()
+ + " in queue: " + i);
+ }
+ execService.shutdownNow();
+ }
+
+ // stop all the components
+ super.serviceStop();
+ }
+
+ protected void dispatch(Event event) {
+ //all events go thru this loop
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ + event.toString());
+ }
+
+ Class<? extends Enum> type = event.getType().getDeclaringClass();
+
+ try{
+ EventHandler handler = eventHandlers.get(type);
+ if(handler != null) {
+ handler.handle(event);
+ } else {
+ throw new Exception("No handler for registered for " + type);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error in dispatcher thread", t);
+ // If serviceStop is called, we should exit this thread gracefully.
+ if (exitOnDispatchException
+ && (ShutdownHookManager.get().isShutdownInProgress()) == false
+ && stopped == false) {
+ Thread shutDownThread = new Thread(createShutDownThread());
+ shutDownThread.setName("AsyncDispatcher ShutDown handler");
+ shutDownThread.start();
+ }
+ }
+ }
+
+ private void checkForExistingHandler(Class<? extends Enum> eventType) {
+ EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+ Preconditions.checkState(registeredHandler == null,
+ "Cannot register same event on multiple dispatchers");
+ }
+
+ private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
+ AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType);
+ Preconditions.checkState(registeredDispatcher == null,
+ "Multiple dispatchers cannot be registered for: " + eventType.getName());
+ }
+
+ private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
+ if (checkHandler) {
+ checkForExistingHandler(eventType);
+ }
+ checkForExistingDispatcher(eventType);
+ }
+
+ /**
+ * Add an EventHandler for events handled inline on this dispatcher
+ */
+ @Override
+ public void register(Class<? extends Enum> eventType,
+ EventHandler handler) {
+ Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+ /* check to see if we have a listener registered */
+ EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+ checkForExistingDispatchers(false, eventType);
+ LOG.info("Registering " + eventType + " for " + handler.getClass());
+ if (registeredHandler == null) {
+ eventHandlers.put(eventType, handler);
+ } else if (!(registeredHandler instanceof MultiListenerHandler)){
+ /* for multiple listeners of an event add the multiple listener handler */
+ MultiListenerHandler multiHandler = new MultiListenerHandler();
+ multiHandler.addHandler(registeredHandler);
+ multiHandler.addHandler(handler);
+ eventHandlers.put(eventType, multiHandler);
+ } else {
+ /* already a multilistener, just add to it */
+ MultiListenerHandler multiHandler
+ = (MultiListenerHandler) registeredHandler;
+ multiHandler.addHandler(handler);
+ }
+ }
+
+ /**
+ * Add an EventHandler for events handled in their own dispatchers with given name and threads
+ */
+
+ public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> eventType,
+ EventHandler handler, String dispatcherName, int numThreads) {
+ Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+
+ /* check to see if we have a listener registered */
+ checkForExistingDispatchers(true, eventType);
+ LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+ AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
+ dispatcher.register(eventType, handler);
+ eventDispatchers.put(eventType, dispatcher);
+ addIfService(dispatcher);
+ return dispatcher;
+ }
+
+ public void registerWithExistingDispatcher(Class<? extends Enum> eventType,
+ EventHandler handler, AsyncDispatcherConcurrent dispatcher) {
+ Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+
+ /* check to see if we have a listener registered */
+ checkForExistingDispatchers(true, eventType);
+ LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
+ + handler.getClass());
+ dispatcher.register(eventType, handler);
+ eventDispatchers.put(eventType, dispatcher);
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return handlerInstance;
+ }
+
+ class GenericEventHandler implements EventHandler<TezAbstractEvent> {
+ public void handle(TezAbstractEvent event) {
+ if (stopped) {
+ return;
+ }
+ if (blockNewEvents) {
+ return;
+ }
+ drained = false;
+
+ // offload to specific dispatcher if one exists
+ Class<? extends Enum> type = event.getType().getDeclaringClass();
+ AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(type);
+ if (registeredDispatcher != null) {
+ registeredDispatcher.getEventHandler().handle(event);
+ return;
+ }
+
+ int index = numThreads > 1 ? event.getSerializingHash() % numThreads : 0;
+
+ // no registered dispatcher. use internal dispatcher.
+ LinkedBlockingQueue<Event> queue = eventQueues.get(index);
+ /* all this method does is enqueue all the events onto the queue */
+ int qSize = queue.size();
+ if (qSize !=0 && qSize %1000 == 0) {
+ LOG.info("Size of event-queue is " + qSize);
+ }
+ int remCapacity = queue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue: "
+ + remCapacity);
+ }
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ if (!stopped) {
+ LOG.warn("AsyncDispatcher thread interrupted", e);
+ }
+ throw new YarnRuntimeException(e);
+ }
+ };
+ }
+
+ /**
+ * Multiplexing an event. Sending it to different handlers that
+ * are interested in the event.
+ * @param <T> the type of event these multiple handlers are interested in.
+ */
+ static class MultiListenerHandler implements EventHandler<Event> {
+ List<EventHandler<Event>> listofHandlers;
+
+ public MultiListenerHandler() {
+ listofHandlers = new ArrayList<EventHandler<Event>>();
+ }
+
+ @Override
+ public void handle(Event event) {
+ for (EventHandler<Event> handler: listofHandlers) {
+ handler.handle(event);
+ }
+ }
+
+ void addHandler(EventHandler<Event> handler) {
+ listofHandlers.add(handler);
+ }
+
+ }
+
+ Runnable createShutDownThread() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Exiting, bbye..");
+ System.exit(-1);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java b/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java
new file mode 100644
index 0000000..b736112
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+package org.apache.tez.common;
+
+/**
+ * Event that allows running in parallel for different instances
+ *
+ * @param <TYPE>
+ * Event type
+ */
+public abstract class TezAbstractEvent<TYPE extends Enum<TYPE>> extends
+ org.apache.hadoop.yarn.event.AbstractEvent<TYPE> {
+
+ public TezAbstractEvent(TYPE type) {
+ super(type);
+ }
+
+ /**
+ * Returning a number that is identical for event instances that need to be
+ * serialized while processing.
+ *
+ * @return Serializing identifier. Not overriding this causes serialization
+ * for all events instances
+ */
+ public int getSerializingHash() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index b4c7b32..3d28348 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -44,6 +44,7 @@ import com.google.common.cache.LoadingCache;
@InterfaceStability.Stable
public class TezTaskID extends TezID {
public static final String TASK = "task";
+ private final int serializingHash;
static final ThreadLocal<NumberFormat> tezTaskIdFormat = new ThreadLocal<NumberFormat>() {
@Override
@@ -67,10 +68,6 @@ public class TezTaskID extends TezID {
private TezVertexID vertexId;
- // Public for Writable serialization. Verify if this is actually required.
- public TezTaskID() {
- }
-
/**
* Constructs a TezTaskID object from given {@link TezVertexID}.
* @param vertexID the vertexID object for this TezTaskID
@@ -91,6 +88,11 @@ public class TezTaskID extends TezID {
super(id);
Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
this.vertexId = vertexID;
+ this.serializingHash = getHashCode(true);
+ }
+
+ public int getSerializingHash() {
+ return serializingHash;
}
/** Returns the {@link TezVertexID} object that this task belongs to */
@@ -135,7 +137,15 @@ public class TezTaskID extends TezID {
@Override
public int hashCode() {
- return vertexId.hashCode() * 535013 + id;
+ return getHashCode(false);
+ }
+
+ public int getHashCode(boolean makePositive) {
+ int code = vertexId.hashCode() * 535013 + id;
+ if (makePositive) {
+ code = (code < 0 ? -code : code);
+ }
+ return code;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
index ad7f5df..bcd1c5f 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
@@ -116,7 +116,7 @@ public class TestAsyncDispatcher {
central.register(TestEventType1.class, new TestEventHandler1());
Assert.fail();
} catch (IllegalStateException e) {
- Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers"));
+ Assert.assertTrue(e.getMessage().contains("Multiple dispatchers cannot be registered for"));
} finally {
central.close();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java
new file mode 100644
index 0000000..1fa8123
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("unchecked")
+public class TestAsyncDispatcherConcurrent {
+
+ static class CountDownEventHandler {
+ static CountDownLatch latch;
+ static void init(CountDownLatch latch) {
+ CountDownEventHandler.latch = latch;
+ }
+
+ static void checkParallelCountersDoneAndFinish() throws Exception {
+ latch.countDown();
+ latch.await();
+ }
+
+ public void handle() {
+ latch.countDown();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public enum TestEventType1 { TYPE1 }
+ public class TestEvent1 extends TezAbstractEvent<TestEventType1> {
+ final int hash;
+ public TestEvent1(TestEventType1 type, int hash) {
+ super(type);
+ this.hash = hash;
+ }
+
+ @Override
+ public int getSerializingHash() {
+ return hash;
+ }
+ }
+ class TestEventHandler1 extends CountDownEventHandler implements EventHandler<TestEvent1> {
+ @Override
+ public void handle(TestEvent1 event) {
+ handle();
+ }
+ }
+ public enum TestEventType2 { TYPE2 }
+ public class TestEvent2 extends TezAbstractEvent<TestEventType2> {
+ public TestEvent2(TestEventType2 type) {
+ super(type);
+ }
+ }
+ class TestEventHandler2 extends CountDownEventHandler implements EventHandler<TestEvent2> {
+ @Override
+ public void handle(TestEvent2 event) {
+ handle();
+ }
+ }
+ public enum TestEventType3 { TYPE3 }
+ public class TestEvent3 extends TezAbstractEvent<TestEventType3> {
+ public TestEvent3(TestEventType3 type) {
+ super(type);
+ }
+ }
+ class TestEventHandler3 extends CountDownEventHandler implements EventHandler<TestEvent3> {
+ @Override
+ public void handle(TestEvent3 event) {
+ handle();
+ }
+ }
+
+ @Test (timeout=5000)
+ public void testBasic() throws Exception {
+ CountDownLatch latch = new CountDownLatch(4);
+ CountDownEventHandler.init(latch);
+
+ AsyncDispatcher central = new AsyncDispatcher("Type1");
+ central.register(TestEventType1.class, new TestEventHandler1());
+ central.registerAndCreateDispatcher(TestEventType2.class, new TestEventHandler2(), "Type2", 1);
+ central.registerAndCreateDispatcher(TestEventType3.class, new TestEventHandler3(), "Type3", 1);
+
+ central.init(new Configuration());
+ central.start();
+ // 3 threads in different dispatchers will handle 3 events
+ central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 0));
+ central.getEventHandler().handle(new TestEvent2(TestEventType2.TYPE2));
+ central.getEventHandler().handle(new TestEvent3(TestEventType3.TYPE3));
+ // wait for all events to be run in parallel
+ CountDownEventHandler.checkParallelCountersDoneAndFinish();
+ central.close();
+ }
+
+ @Test (timeout=5000)
+ public void testMultiThreads() throws Exception {
+ CountDownLatch latch = new CountDownLatch(4);
+ CountDownEventHandler.init(latch);
+
+ AsyncDispatcherConcurrent central = new AsyncDispatcherConcurrent("Type1", 1);
+ central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler1(), "Type1", 3);
+
+ central.init(new Configuration());
+ central.start();
+ // 3 threads in the same dispatcher will handle 3 events
+ central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 0));
+ central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 1));
+ central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 2));
+ // wait for all events to be run in parallel
+ CountDownEventHandler.checkParallelCountersDoneAndFinish();
+ central.close();
+ }
+
+ @Test (timeout=5000)
+ public void testMultipleRegisterFail() throws Exception {
+ AsyncDispatcher central = new AsyncDispatcher("Type1");
+ try {
+ central.register(TestEventType1.class, new TestEventHandler1());
+ central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers"));
+ } finally {
+ central.close();
+ }
+
+ central = new AsyncDispatcher("Type1");
+ try {
+ central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+ central.register(TestEventType1.class, new TestEventHandler1());
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered"));
+ } finally {
+ central.close();
+ }
+
+ central = new AsyncDispatcher("Type1");
+ try {
+ central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+ central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered"));
+ } finally {
+ central.close();
+ }
+
+ central = new AsyncDispatcher("Type1");
+ try {
+ central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2");
+ central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2");
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("Multiple dispatchers cannot be registered for"));
+ } finally {
+ central.close();
+ }
+
+ central = new AsyncDispatcher("Type1");
+ try {
+ AsyncDispatcherConcurrent concDispatcher = central.registerAndCreateDispatcher(
+ TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+ central.registerWithExistingDispatcher(TestEventType1.class, new TestEventHandler1(),
+ concDispatcher);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered"));
+ } finally {
+ central.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 27b9c37..3e3d6f0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.AsyncDispatcherConcurrent;
import org.apache.tez.common.GcTimeUpdater;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezConverterUtils;
@@ -455,12 +456,22 @@ public class DAGAppMaster extends AbstractService {
dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
- dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
- dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+ if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
+ TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) {
+ dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+ } else {
+ int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
+ TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT);
+ AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher(
+ TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency);
+ dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class,
+ new TaskAttemptEventDispatcher(), sharedDispatcher);
+ }
// register other delegating dispatchers
- dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(), "Speculator");
-
+ dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
+ "Speculator");
if (enableWebUIService()) {
this.webUIService = new WebUIService(context);
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 3f60a4e..6c85cc2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -128,8 +128,6 @@ public interface TaskAttempt {
*/
long getFinishTime();
- public Task getTask();
-
TaskAttemptState restoreFromEvent(HistoryEvent event);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
index e148fe8..7e68752 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
@@ -20,11 +20,11 @@ package org.apache.tez.dag.app.dag.event;
import java.util.concurrent.Callable;
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
import com.google.common.util.concurrent.FutureCallback;
-public abstract class CallableEvent extends AbstractEvent<CallableEventType> implements
+public abstract class CallableEvent extends TezAbstractEvent<CallableEventType> implements
Callable<Void> {
private final FutureCallback<Void> callback;
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
index 0571cab..b7cb3a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
@@ -18,9 +18,10 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
-public class DAGAppMasterEvent extends AbstractEvent<DAGAppMasterEventType> {
+
+public class DAGAppMasterEvent extends TezAbstractEvent<DAGAppMasterEventType> {
public DAGAppMasterEvent(DAGAppMasterEventType type) {
super(type);
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
index 1ec0222..a0a8a1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
@@ -18,14 +18,14 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezDAGID;
/**
* This class encapsulates job related events.
*
*/
-public class DAGEvent extends AbstractEvent<DAGEventType> {
+public class DAGEvent extends TezAbstractEvent<DAGEventType> {
private TezDAGID dagId;
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
index 16fab8e..3863a2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
@@ -18,10 +18,10 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezVertexID;
-public class SpeculatorEvent extends AbstractEvent<SpeculatorEventType> {
+public class SpeculatorEvent extends TezAbstractEvent<SpeculatorEventType> {
private final TezVertexID vertexId;
public SpeculatorEvent(SpeculatorEventType type, TezVertexID vertexId) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
index 56c03e3..63ef70f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
@@ -18,14 +18,14 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
/**
* This class encapsulates task attempt related events.
*
*/
-public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
+public class TaskAttemptEvent extends TezAbstractEvent<TaskAttemptEventType> {
private TezTaskAttemptID attemptID;
@@ -42,4 +42,9 @@ public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
public TezTaskAttemptID getTaskAttemptID() {
return attemptID;
}
+
+ @Override
+ public int getSerializingHash() {
+ return attemptID.getTaskID().getSerializingHash();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
index c7e5faa..def9ddf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
@@ -18,14 +18,14 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezTaskID;
/**
* this class encapsulates task related events.
*
*/
-public class TaskEvent extends AbstractEvent<TaskEventType> {
+public class TaskEvent extends TezAbstractEvent<TaskEventType> {
private TezTaskID taskId;
@@ -37,4 +37,9 @@ public class TaskEvent extends AbstractEvent<TaskEventType> {
public TezTaskID getTaskID() {
return taskId;
}
+
+ @Override
+ public int getSerializingHash() {
+ return taskId.getSerializingHash();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
index 9e94eb5..33128e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
@@ -18,14 +18,14 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.dag.records.TezVertexID;
/**
* this class encapsulates vertex related events.
*
*/
-public class VertexEvent extends AbstractEvent<VertexEventType> {
+public class VertexEvent extends TezAbstractEvent<VertexEventType> {
private TezVertexID vertexId;
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f562451..f769565 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1261,7 +1261,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (finishTime == 0) {
setFinishTime();
}
-
entityUpdateTracker.stop();
boolean recoveryError = false;
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1f3e1cf..b1c0acc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -144,6 +144,9 @@ public class TaskAttemptImpl implements TaskAttempt,
private NodeId containerNodeId;
private String nodeHttpAddress;
private String nodeRackName;
+
+ private final Task task;
+ private final Vertex vertex;
@VisibleForTesting
TaskAttemptStatus reportedStatus;
@@ -406,7 +409,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
- Resource resource, ContainerContext containerContext, boolean leafVertex) {
+ Resource resource, ContainerContext containerContext, boolean leafVertex,
+ Task task) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -417,6 +421,9 @@ public class TaskAttemptImpl implements TaskAttempt,
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
+ this.task = task;
+ this.vertex = this.task.getVertex();
+
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
RackResolver.init(conf);
@@ -649,17 +656,9 @@ public class TaskAttemptImpl implements TaskAttempt,
readLock.unlock();
}
}
-
- @Override
- public Task getTask() {
- return appContext.getCurrentDAG()
- .getVertex(attemptId.getTaskID().getVertexID())
- .getTask(attemptId.getTaskID());
- }
-
+
Vertex getVertex() {
- return appContext.getCurrentDAG()
- .getVertex(attemptId.getTaskID().getVertexID());
+ return vertex;
}
@SuppressWarnings("unchecked")
@@ -955,7 +954,7 @@ public class TaskAttemptImpl implements TaskAttempt,
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
&& conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
- String contextStr = "v_" + getTask().getVertex().getName()
+ String contextStr = "v_" + getVertex().getName()
+ "_" + this.attemptId.toString();
completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+ "/" + containerNodeId.toString()
@@ -964,7 +963,7 @@ public class TaskAttemptImpl implements TaskAttempt,
+ "/" + this.appContext.getUser();
}
TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
- attemptId, getTask().getVertex().getName(),
+ attemptId, getVertex().getName(),
launchTime, containerId, containerNodeId,
inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
this.appContext.getHistoryHandler().handle(
@@ -976,7 +975,7 @@ public class TaskAttemptImpl implements TaskAttempt,
if (getLaunchTime() == 0) return;
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
- attemptId, getTask().getVertex().getName(), getLaunchTime(),
+ attemptId, getVertex().getName(), getLaunchTime(),
getFinishTime(), TaskAttemptState.SUCCEEDED, null,
"", getCounters());
// FIXME how do we store information regd completion events
@@ -987,7 +986,7 @@ public class TaskAttemptImpl implements TaskAttempt,
protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state) {
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
- attemptId, getTask().getVertex().getName(), getLaunchTime(),
+ attemptId, getVertex().getName(), getLaunchTime(),
clock.getTime(), state,
terminationCause,
StringUtils.join(
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 461339b..8b63734 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -119,6 +119,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private Map<TezTaskAttemptID, TaskAttempt> attempts;
private final int maxFailedAttempts;
protected final Clock clock;
+ private final Vertex vertex;
private final Lock readLock;
private final Lock writeLock;
private final List<String> diagnostics = new ArrayList<String>();
@@ -326,7 +327,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean leafVertex, Resource resource,
ContainerContext containerContext,
- StateChangeNotifier stateChangeNotifier) {
+ StateChangeNotifier stateChangeNotifier,
+ Vertex vertex) {
this.conf = conf;
this.clock = clock;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -342,7 +344,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
this.eventHandler = eventHandler;
this.appContext = appContext;
this.stateChangeNotifier = stateChangeNotifier;
-
+ this.vertex = vertex;
this.leafVertex = leafVertex;
this.taskResource = resource;
this.containerContext = containerContext;
@@ -382,7 +384,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public Vertex getVertex() {
- return appContext.getCurrentDAG().getVertex(taskId.getVertexID());
+ return vertex;
}
@Override
@@ -778,7 +780,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptImpl createAttempt(int attemptNumber) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
- (failedAttempts > 0), taskResource, containerContext, leafVertex);
+ (failedAttempts > 0), taskResource, containerContext, leafVertex, this);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c5de19b..9ed7441 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -109,9 +109,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
@@ -164,7 +162,6 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
@@ -194,8 +191,7 @@ import org.slf4j.LoggerFactory;
* The read and write calls use ReadWriteLock for concurrency.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
-public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
- EventHandler<VertexEvent> {
+public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandler<VertexEvent> {
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
@@ -216,6 +212,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// TODO Metrics
//private final MRAppMetrics metrics;
private final AppContext appContext;
+ private final DAG dag;
private boolean lazyTasksCopyNeeded = false;
// must be a linked map for ordering
@@ -867,6 +864,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
setTaskLocationHints(vertexLocationHint);
this.dagUgi = appContext.getCurrentDAG().getDagUGI();
+ this.dag = appContext.getCurrentDAG();
this.taskResource = DagTypeConverters
.createResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
@@ -2154,7 +2152,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.targetVertices.isEmpty() : true),
this.taskResource,
conContext,
- this.stateChangeNotifier);
+ this.stateChangeNotifier,
+ this);
}
private void createTasks() {
@@ -4409,7 +4408,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public DAG getDAG() {
- return appContext.getCurrentDAG();
+ return dag;
}
private TezDAGID getDAGId() {
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 2a061bc..87ffead 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -136,7 +136,7 @@ public class TestMockDAGAppMaster {
lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
- DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
+ DAG dag = DAG.create("testLocalResourceSetup").addTaskLocalFiles(lrDAG);
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
dag.addVertex(vA);
@@ -166,7 +166,7 @@ public class TestMockDAGAppMaster {
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
// there is only 1 task whose first attempt will be preempted
- DAG dag = DAG.create("test");
+ DAG dag = DAG.create("testInternalPreemption");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
dag.addVertex(vA);
@@ -197,7 +197,7 @@ public class TestMockDAGAppMaster {
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.sendDMEvents = true;
- DAG dag = DAG.create("test");
+ DAG dag = DAG.create("testBasicEvents");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 2);
@@ -230,21 +230,27 @@ public class TestMockDAGAppMaster {
List<TezEvent> tEvents = tImpl.getTaskEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
- Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
- Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+ int targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
+ int targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
+ // order of vA task completion can change order of events
+ Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2,
+ (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
tEvents = tImpl.getTaskEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
- Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
- Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+ targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
+ targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
+ // order of vA task completion can change order of events
+ Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2,
+ (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
tEvents = tImpl.getTaskEvents();
@@ -478,7 +484,7 @@ public class TestMockDAGAppMaster {
final String vAName = "A";
- DAG dag = DAG.create("testBasicCounters");
+ DAG dag = DAG.create("testBasicCounterMemory");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10000);
dag.addVertex(vA);
@@ -511,6 +517,30 @@ public class TestMockDAGAppMaster {
checkMemory(dag.getName(), mockApp);
tezClient.stop();
}
+
+ @Ignore
+ @Test (timeout = 60000)
+ public void testTaskEventsProcessingSpeed() throws Exception {
+ Logger.getRootLogger().setLevel(Level.WARN);
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+ tezconf.setBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER, true);
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+ null, false, false, 30, 1000);
+ tezClient.start();
+
+ final String vAName = "A";
+
+ DAG dag = DAG.create("testTaskEventsProcessingSpeed");
+ Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 50000);
+ dag.addVertex(vA);
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ mockApp.doSleep = false;
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ DAGStatus status = dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+ tezClient.stop();
+ }
@Ignore
@Test (timeout = 60000)
@@ -530,7 +560,7 @@ public class TestMockDAGAppMaster {
ioStats.setItemsProcessed(1);
TaskStatistics vAStats = new TaskStatistics();
- DAG dag = DAG.create("testBasisStatistics");
+ DAG dag = DAG.create("testBasicStatisticsMemory");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), numTasks);
for (int i=0; i<numSources; ++i) {
final String sourceName = i + vAName;
@@ -623,7 +653,7 @@ public class TestMockDAGAppMaster {
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
- DAG dag = DAG.create("test");
+ DAG dag = DAG.create("testSchedulerErrorHandling");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
dag.addVertex(vA);
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 2a2df7c..50bb68c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -81,6 +81,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
@@ -672,6 +673,35 @@ public class TestTaskAttempt {
}
@Test(timeout = 5000)
+ public void testEventSerializingHash() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID1 = TezTaskID.getInstance(vertexID, 1);
+ TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2);
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(taskID1, 0);
+ TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(taskID1, 1);
+ TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(taskID2, 1);
+
+ TaskAttemptEvent taEventFail11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_FAILED);
+ TaskAttemptEvent taEventKill11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_KILL_REQUEST);
+ TaskAttemptEvent taEventKill12 = new TaskAttemptEvent(taID12, TaskAttemptEventType.TA_KILL_REQUEST);
+ TaskAttemptEvent taEventKill21 = new TaskAttemptEvent(taID21, TaskAttemptEventType.TA_KILL_REQUEST);
+ TaskEvent tEventKill1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_KILLED);
+ TaskEvent tEventFail1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_FAILED);
+ TaskEvent tEventFail2 = new TaskEvent(taskID2, TaskEventType.T_ATTEMPT_FAILED);
+
+ // all of them should have the same value
+ assertEquals(taEventFail11.getSerializingHash(), taEventKill11.getSerializingHash());
+ assertEquals(taEventKill11.getSerializingHash(), taEventKill12.getSerializingHash());
+ assertEquals(tEventFail1.getSerializingHash(), tEventKill1.getSerializingHash());
+ assertEquals(taEventFail11.getSerializingHash(), tEventKill1.getSerializingHash());
+ assertEquals(taEventKill21.getSerializingHash(), tEventFail2.getSerializingHash());
+ // events from different tasks may not have the same value
+ assertFalse(tEventFail1.getSerializingHash() == tEventFail2.getSerializingHash());
+ }
+
+ @Test(timeout = 5000)
public void testSuccess() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
@@ -695,7 +725,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
- ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@@ -786,7 +816,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
- ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@@ -881,7 +911,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
- ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@@ -984,7 +1014,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
- ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@@ -1084,7 +1114,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
- ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
@@ -1211,7 +1241,7 @@ public class TestTaskAttempt {
Resource resource, ContainerContext containerContext, boolean leafVertex) {
super(taskId, attemptNumber, eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
- isRescheduled, resource, containerContext, leafVertex);
+ isRescheduled, resource, containerContext, leafVertex, mock(TaskImpl.class));
this.locationHint = locationHint;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index b8b09d0..d6d874d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -148,7 +148,7 @@ public class TestTaskAttemptRecovery {
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mockAppContext, false, Resource.newInstance(1, 1),
- mock(ContainerContext.class), false);
+ mock(ContainerContext.class), false, mockTask);
taId = ta.getID();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 9509df4..66e6724 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -630,7 +630,7 @@ public class TestTaskImpl {
ContainerContext containerContext, Vertex vertex) {
super(vertexId, partition, eventHandler, conf, taskAttemptListener,
clock, thh, appContext, leafVertex, resource,
- containerContext, mock(StateChangeNotifier.class));
+ containerContext, mock(StateChangeNotifier.class), vertex);
this.vertex = vertex;
this.locationHint = locationHint;
}
@@ -687,7 +687,7 @@ public class TestTaskImpl {
TaskLocationHint locationHint, boolean isRescheduled,
Resource resource, ContainerContext containerContext) {
super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
- appContext, isRescheduled, resource, containerContext, false);
+ appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class));
this.locationHint = locationHint;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index e182f24..2a49826 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -190,7 +190,7 @@ public class TestTaskRecovery {
new Configuration(), mock(TaskAttemptListener.class),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mockAppContext, false, Resource.newInstance(1, 1),
- mock(ContainerContext.class), mock(StateChangeNotifier.class));
+ mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex);
Map<String, OutputCommitter> committers =
new HashMap<String, OutputCommitter>();