You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/03 00:21:32 UTC

tez git commit: TEZ-1897. Create a concurrent version of AsyncDispatcher (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 9f090279d -> f6ea0fb33


TEZ-1897. Create a concurrent version of AsyncDispatcher (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f6ea0fb3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f6ea0fb3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f6ea0fb3

Branch: refs/heads/master
Commit: f6ea0fb3306faa709c445e4d76081de60545d760
Parents: 9f09027
Author: Bikas Saha <bi...@apache.org>
Authored: Sat May 2 15:21:17 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat May 2 15:21:17 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  14 +
 tez-common/findbugs-exclude.xml                 |   5 +
 .../org/apache/tez/common/AsyncDispatcher.java  |  85 ++++-
 .../tez/common/AsyncDispatcherConcurrent.java   | 368 +++++++++++++++++++
 .../org/apache/tez/common/TezAbstractEvent.java |  45 +++
 .../org/apache/tez/dag/records/TezTaskID.java   |  20 +-
 .../apache/tez/common/TestAsyncDispatcher.java  |   2 +-
 .../common/TestAsyncDispatcherConcurrent.java   | 194 ++++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  19 +-
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |   2 -
 .../tez/dag/app/dag/event/CallableEvent.java    |   4 +-
 .../dag/app/dag/event/DAGAppMasterEvent.java    |   5 +-
 .../apache/tez/dag/app/dag/event/DAGEvent.java  |   4 +-
 .../tez/dag/app/dag/event/SpeculatorEvent.java  |   4 +-
 .../tez/dag/app/dag/event/TaskAttemptEvent.java |   9 +-
 .../apache/tez/dag/app/dag/event/TaskEvent.java |   9 +-
 .../tez/dag/app/dag/event/VertexEvent.java      |   4 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   1 -
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  29 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  10 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  13 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |  50 ++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  42 ++-
 .../app/dag/impl/TestTaskAttemptRecovery.java   |   2 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |   4 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  |   2 +-
 27 files changed, 857 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 609db3c..8108ac8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-1897. Create a concurrent version of AsyncDispatcher
   TEZ-2394. Issues when there is an error in VertexManager callbacks
   TEZ-2386. Tez UI: Inconsistent usage of icon colors
   TEZ-2395. Tez UI: Minimum/Maximum Duration show a empty bracket next to 0 secs when you purposefully failed a job.

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 14e773d..a301957 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -141,6 +141,20 @@ public class TezConfiguration extends Configuration {
   @ConfigurationScope(Scope.AM)
   public static final String TEZ_CREDENTIALS_PATH = TEZ_PREFIX + "credentials.path";
 
+  @Private
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_USE_CONCURRENT_DISPATCHER = TEZ_AM_PREFIX
+      + "use.concurrent-dispatcher";
+  @Private
+  public static boolean TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT = true;
+  
+  @Private
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY = TEZ_AM_PREFIX
+      + "concurrent-dispatcher.concurrency";
+  @Private
+  public static final int TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT = 10;
+
   /**
    * Boolean value. Execution mode for the Tez application. True implies session mode. If the client
    * code is written according to best practices then the same code can execute in either mode based

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-common/findbugs-exclude.xml b/tez-common/findbugs-exclude.xml
index 7814585..6f6253d 100644
--- a/tez-common/findbugs-exclude.xml
+++ b/tez-common/findbugs-exclude.xml
@@ -20,4 +20,9 @@
     <Bug pattern="DM_EXIT"/>
   </Match>
 
+  <Match>
+    <Class name="org.apache.tez.common.AsyncDispatcherConcurrent$1"/>
+    <Method name="run" />
+    <Bug pattern="DM_EXIT"/>
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 5aaa4cf..4319f4f 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -68,8 +68,11 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
   private EventHandler handlerInstance = new GenericEventHandler();
 
   private Thread eventHandlingThread;
-  protected final Map<Class<? extends Enum>, EventHandler> eventHandlers;
-  protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers;
+  protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap();
+  protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers = Maps.newHashMap();
+  protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> concurrentEventDispatchers = 
+      Maps.newHashMap();
+  
   private boolean exitOnDispatchException;
 
   public AsyncDispatcher(String name) {
@@ -77,11 +80,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
   }
 
   public AsyncDispatcher(String name, BlockingQueue<Event> eventQueue) {
-    super("Dispatcher");
+    super(name);
     this.name = name;
     this.eventQueue = eventQueue;
-    this.eventHandlers = Maps.newHashMap();
-    this.eventDispatchers = Maps.newHashMap();
   }
 
   public Runnable createThread() {
@@ -195,6 +196,32 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
       }
     }
   }
+  
+  private void checkForExistingHandler(Class<? extends Enum> eventType) {
+    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+    Preconditions.checkState(registeredHandler == null, 
+        "Cannot register same event on multiple dispatchers");
+  }
+
+  private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
+    AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
+    Preconditions.checkState(registeredDispatcher == null, 
+        "Multiple dispatchers cannot be registered for: " + eventType.getName());
+  }
+
+  private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) {
+    AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType);
+    Preconditions.checkState(concurrentDispatcher == null, 
+        "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
+  }
+  
+  private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
+    if (checkHandler) {
+      checkForExistingHandler(eventType);
+    }
+    checkForExistingDispatcher(eventType);
+    checkForExistingConcurrentDispatcher(eventType);
+  }
 
   /**
    * Add an EventHandler for events handled inline on this dispatcher
@@ -205,9 +232,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     Preconditions.checkState(getServiceState() == STATE.NOTINITED);
     /* check to see if we have a listener registered */
     EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
-    AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
-    Preconditions.checkState(registeredDispatcher == null,
-        "Cannot register same event on multiple dispatchers");
+    checkForExistingDispatchers(false, eventType);
     LOG.info("Registering " + eventType + " for " + handler.getClass());
     if (registeredHandler == null) {
       eventHandlers.put(eventType, handler);
@@ -231,20 +256,41 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
   public void registerAndCreateDispatcher(Class<? extends Enum> eventType,
       EventHandler handler, String dispatcherName) {
     Preconditions.checkState(getServiceState() == STATE.NOTINITED);
-    AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
-    dispatcher.register(eventType, handler);
     
     /* check to see if we have a listener registered */
-    AsyncDispatcher registeredDispatcher = eventDispatchers.get(eventType);
-    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
-    Preconditions.checkState(registeredHandler == null, 
-        "Cannot register same event on multiple dispatchers");
+    checkForExistingDispatchers(true, eventType);
     LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
-    Preconditions.checkState(registeredDispatcher == null, 
-        "Multiple dispatchers cannot be registered for: " + eventType.getName());
+    AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
+    dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
     addIfService(dispatcher);
   }
+  
+  public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> eventType,
+      EventHandler handler, String dispatcherName, int numThreads) {
+    Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+    
+    /* check to see if we have a listener registered */
+    checkForExistingDispatchers(true, eventType);
+    LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
+    AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
+    dispatcher.register(eventType, handler);
+    concurrentEventDispatchers.put(eventType, dispatcher);
+    addIfService(dispatcher);
+    return dispatcher;
+  }
+  
+  public void registerWithExistingDispatcher(Class<? extends Enum> eventType,
+      EventHandler handler, AsyncDispatcherConcurrent dispatcher) {
+    Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+    
+    /* check to see if we have a listener registered */
+    checkForExistingDispatchers(true, eventType);
+    LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
+        + handler.getClass());
+    dispatcher.register(eventType, handler);
+    concurrentEventDispatchers.put(eventType, dispatcher);
+  }
 
   @Override
   public EventHandler getEventHandler() {
@@ -261,13 +307,18 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
       }
       drained = false;
 
-      // offload to specific dispatcher is one exists
+      // offload to specific dispatcher if one exists
       Class<? extends Enum> type = event.getType().getDeclaringClass();
       AsyncDispatcher registeredDispatcher = eventDispatchers.get(type);
       if (registeredDispatcher != null) {
         registeredDispatcher.getEventHandler().handle(event);
         return;
       }
+      AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(type);
+      if (concurrentDispatcher != null) {
+        concurrentDispatcher.getEventHandler().handle(event);
+        return;
+      }
       
       // no registered dispatcher. use internal dispatcher.
       

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
new file mode 100644
index 0000000..d19bf9e
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A dispatcher that can schedule events concurrently. Uses a fixed size threadpool 
+ * to schedule events. Events that have the same serializing hash will get scheduled
+ * on the same thread in the threadpool. This can be used to prevent concurrency issues
+ * for events that may not be independently processed.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@Private
+public class AsyncDispatcherConcurrent extends CompositeService implements Dispatcher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class);
+
+  private final String name;
+  private final ArrayList<LinkedBlockingQueue<Event>> eventQueues;
+  private volatile boolean stopped = false;
+
+  // Configuration flag for enabling/disabling draining dispatcher's events on
+  // stop functionality.
+  private volatile boolean drainEventsOnStop = false;
+
+  // Indicates all the remaining dispatcher's events on stop have been drained
+  // and processed.
+  private volatile boolean drained = true;
+  private Object waitForDrained = new Object();
+
+  // For drainEventsOnStop enabled only, block newly coming events into the
+  // queue while stopping.
+  private volatile boolean blockNewEvents = false;
+  private EventHandler handlerInstance = new GenericEventHandler();
+
+  private ExecutorService execService;
+  private final int numThreads;
+  
+  protected final Map<Class<? extends Enum>, EventHandler> eventHandlers = Maps.newHashMap();
+  protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> eventDispatchers = 
+      Maps.newHashMap();
+  private boolean exitOnDispatchException;
+
+  AsyncDispatcherConcurrent(String name, int numThreads) {
+    super(name);
+    Preconditions.checkArgument(numThreads > 0);
+    this.name = name;
+    this.eventQueues = Lists.newArrayListWithCapacity(numThreads);
+    this.numThreads = numThreads;
+  }
+  
+  class DispatchRunner implements Runnable {
+    final LinkedBlockingQueue<Event> queue;
+    
+    public DispatchRunner(LinkedBlockingQueue<Event> queue) {
+      this.queue = queue;
+    }
+    
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        drained = queue.isEmpty();
+        // blockNewEvents is only set when dispatcher is draining to stop,
+        // adding this check is to avoid the overhead of acquiring the lock
+        // and calling notify every time in the normal run of the loop.
+        if (blockNewEvents) {
+          synchronized (waitForDrained) {
+            if (drained) {
+              waitForDrained.notify();
+            }
+          }
+        }
+        Event event;
+        try {
+          event = queue.take();
+        } catch(InterruptedException ie) {
+          if (!stopped) {
+            LOG.warn("AsyncDispatcher thread interrupted", ie);
+          }
+          return;
+        }
+        if (event != null) {
+          dispatch(event);
+        }
+      }
+    }
+  };
+  
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    // TODO TEZ-2049 remove YARN reference
+    this.exitOnDispatchException =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("Dispatcher [" + this.name + "] #%d").build());
+    for (int i=0; i<numThreads; ++i) {
+      eventQueues.add(new LinkedBlockingQueue<Event>());
+    }
+    for (int i=0; i<numThreads; ++i) {
+      execService.execute(new DispatchRunner(eventQueues.get(i)));
+    }
+    //start all the components
+    super.serviceStart();
+  }
+
+  public void setDrainEventsOnStop() {
+    drainEventsOnStop = true;
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (execService != null) {
+      if (drainEventsOnStop) {
+        blockNewEvents = true;
+        LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
+        synchronized (waitForDrained) {
+          while (!drained && !execService.isShutdown()) {
+            LOG.info("Waiting for AsyncDispatcher to drain.");
+            waitForDrained.wait(1000);
+          }
+        }
+      }
+
+      stopped = true;
+
+      for (int i=0; i<numThreads; ++i) {
+        LOG.info("AsyncDispatcher stopping with events: " + eventQueues.get(i).size()
+            + " in queue: " + i);
+      }
+      execService.shutdownNow();
+    }
+
+    // stop all the components
+    super.serviceStop();
+  }
+
+  protected void dispatch(Event event) {
+    //all events go thru this loop
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+          + event.toString());
+    }
+
+    Class<? extends Enum> type = event.getType().getDeclaringClass();
+
+    try{
+      EventHandler handler = eventHandlers.get(type);
+      if(handler != null) {
+        handler.handle(event);
+      } else {
+        throw new Exception("No handler for registered for " + type);
+      }
+    } catch (Throwable t) {
+      LOG.error("Error in dispatcher thread", t);
+      // If serviceStop is called, we should exit this thread gracefully.
+      if (exitOnDispatchException
+          && (ShutdownHookManager.get().isShutdownInProgress()) == false
+          && stopped == false) {
+        Thread shutDownThread = new Thread(createShutDownThread());
+        shutDownThread.setName("AsyncDispatcher ShutDown handler");
+        shutDownThread.start();
+      }
+    }
+  }
+
+  private void checkForExistingHandler(Class<? extends Enum> eventType) {
+    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+    Preconditions.checkState(registeredHandler == null, 
+        "Cannot register same event on multiple dispatchers");
+  }
+
+  private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
+    AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType);
+    Preconditions.checkState(registeredDispatcher == null, 
+        "Multiple dispatchers cannot be registered for: " + eventType.getName());
+  }
+
+  private void checkForExistingDispatchers(boolean checkHandler, Class<? extends Enum> eventType) {
+    if (checkHandler) {
+      checkForExistingHandler(eventType);
+    }
+    checkForExistingDispatcher(eventType);
+  }
+
+  /**
+   * Add an EventHandler for events handled inline on this dispatcher
+   */
+  @Override
+  public void register(Class<? extends Enum> eventType,
+      EventHandler handler) {
+    Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+    /* check to see if we have a listener registered */
+    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventHandlers.get(eventType);
+    checkForExistingDispatchers(false, eventType);
+    LOG.info("Registering " + eventType + " for " + handler.getClass());
+    if (registeredHandler == null) {
+      eventHandlers.put(eventType, handler);
+    } else if (!(registeredHandler instanceof MultiListenerHandler)){
+      /* for multiple listeners of an event add the multiple listener handler */
+      MultiListenerHandler multiHandler = new MultiListenerHandler();
+      multiHandler.addHandler(registeredHandler);
+      multiHandler.addHandler(handler);
+      eventHandlers.put(eventType, multiHandler);
+    } else {
+      /* already a multilistener, just add to it */
+      MultiListenerHandler multiHandler
+      = (MultiListenerHandler) registeredHandler;
+      multiHandler.addHandler(handler);
+    }
+  }
+  
+  /**
+   * Add an EventHandler for events handled in their own dispatchers with given name and threads
+   */
+  
+  public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> eventType,
+      EventHandler handler, String dispatcherName, int numThreads) {
+    Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+    
+    /* check to see if we have a listener registered */
+    checkForExistingDispatchers(true, eventType);
+    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
+    dispatcher.register(eventType, handler);
+    eventDispatchers.put(eventType, dispatcher);
+    addIfService(dispatcher);
+    return dispatcher;
+  }
+  
+  public void registerWithExistingDispatcher(Class<? extends Enum> eventType,
+      EventHandler handler, AsyncDispatcherConcurrent dispatcher) {
+    Preconditions.checkState(getServiceState() == STATE.NOTINITED);
+    
+    /* check to see if we have a listener registered */
+    checkForExistingDispatchers(true, eventType);
+    LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
+        + handler.getClass());
+    dispatcher.register(eventType, handler);
+    eventDispatchers.put(eventType, dispatcher);
+  }
+
+  @Override
+  public EventHandler getEventHandler() {
+    return handlerInstance;
+  }
+
+  class GenericEventHandler implements EventHandler<TezAbstractEvent> {
+    public void handle(TezAbstractEvent event) {
+      if (stopped) {
+        return;
+      }
+      if (blockNewEvents) {
+        return;
+      }
+      drained = false;
+      
+      // offload to specific dispatcher if one exists
+      Class<? extends Enum> type = event.getType().getDeclaringClass();
+      AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(type);
+      if (registeredDispatcher != null) {
+        registeredDispatcher.getEventHandler().handle(event);
+        return;
+      }
+      
+      int index = numThreads > 1 ? event.getSerializingHash() % numThreads : 0;
+
+     // no registered dispatcher. use internal dispatcher.
+      LinkedBlockingQueue<Event> queue = eventQueues.get(index);
+      /* all this method does is enqueue all the events onto the queue */
+      int qSize = queue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of event-queue is " + qSize);
+      }
+      int remCapacity = queue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue: "
+            + remCapacity);
+      }
+      try {
+        queue.put(event);
+      } catch (InterruptedException e) {
+        if (!stopped) {
+          LOG.warn("AsyncDispatcher thread interrupted", e);
+        }
+        throw new YarnRuntimeException(e);
+      }
+    };
+  }
+
+  /**
+   * Multiplexing an event. Sending it to different handlers that
+   * are interested in the event.
+   * @param <T> the type of event these multiple handlers are interested in.
+   */
+  static class MultiListenerHandler implements EventHandler<Event> {
+    List<EventHandler<Event>> listofHandlers;
+
+    public MultiListenerHandler() {
+      listofHandlers = new ArrayList<EventHandler<Event>>();
+    }
+
+    @Override
+    public void handle(Event event) {
+      for (EventHandler<Event> handler: listofHandlers) {
+        handler.handle(event);
+      }
+    }
+
+    void addHandler(EventHandler<Event> handler) {
+      listofHandlers.add(handler);
+    }
+
+  }
+
+  Runnable createShutDownThread() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        LOG.info("Exiting, bbye..");
+        System.exit(-1);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java b/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java
new file mode 100644
index 0000000..b736112
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezAbstractEvent.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+package org.apache.tez.common;
+
+/**
+ * Event that allows running in parallel for different instances
+ * 
+ * @param <TYPE>
+ *          Event type
+ */
+public abstract class TezAbstractEvent<TYPE extends Enum<TYPE>> extends
+    org.apache.hadoop.yarn.event.AbstractEvent<TYPE> {
+
+  public TezAbstractEvent(TYPE type) {
+    super(type);
+  }
+
+  /**
+   * Returning a number that is identical for event instances that need to be
+   * serialized while processing.
+   * 
+   * @return Serializing identifier. Not overriding this causes serialization
+   *         for all events instances
+   */
+  public int getSerializingHash() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
index b4c7b32..3d28348 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java
@@ -44,6 +44,7 @@ import com.google.common.cache.LoadingCache;
 @InterfaceStability.Stable
 public class TezTaskID extends TezID {
   public static final String TASK = "task";
+  private final int serializingHash;
   
   static final ThreadLocal<NumberFormat> tezTaskIdFormat = new ThreadLocal<NumberFormat>() {
     @Override
@@ -67,10 +68,6 @@ public class TezTaskID extends TezID {
   
   private TezVertexID vertexId;
 
-  // Public for Writable serialization. Verify if this is actually required.
-  public TezTaskID() {
-  }
-
   /**
    * Constructs a TezTaskID object from given {@link TezVertexID}.
    * @param vertexID the vertexID object for this TezTaskID
@@ -91,6 +88,11 @@ public class TezTaskID extends TezID {
     super(id);
     Preconditions.checkArgument(vertexID != null, "vertexID cannot be null");
     this.vertexId = vertexID;
+    this.serializingHash = getHashCode(true);
+  }
+  
+  public int getSerializingHash() {
+    return serializingHash;
   }
 
   /** Returns the {@link TezVertexID} object that this task belongs to */
@@ -135,7 +137,15 @@ public class TezTaskID extends TezID {
 
   @Override
   public int hashCode() {
-    return vertexId.hashCode() * 535013 + id;
+    return getHashCode(false);
+  }
+
+  public int getHashCode(boolean makePositive) {
+    int code = vertexId.hashCode() * 535013 + id;
+    if (makePositive) {
+      code = (code < 0 ? -code : code);
+    }
+    return code;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
index ad7f5df..bcd1c5f 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcher.java
@@ -116,7 +116,7 @@ public class TestAsyncDispatcher {
       central.register(TestEventType1.class, new TestEventHandler1());
       Assert.fail();
     } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers"));
+      Assert.assertTrue(e.getMessage().contains("Multiple dispatchers cannot be registered for"));
     } finally {
       central.close();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java
new file mode 100644
index 0000000..1fa8123
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/TestAsyncDispatcherConcurrent.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("unchecked")
+public class TestAsyncDispatcherConcurrent {
+
+  static class CountDownEventHandler {
+    static CountDownLatch latch;
+    static void init(CountDownLatch latch) {
+      CountDownEventHandler.latch = latch;
+    }
+
+    static void checkParallelCountersDoneAndFinish() throws Exception {
+      latch.countDown();
+      latch.await();
+    }
+    
+    public void handle() {
+      latch.countDown();
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public enum TestEventType1 { TYPE1 }
+  public class TestEvent1 extends TezAbstractEvent<TestEventType1> {
+    final int hash;
+    public TestEvent1(TestEventType1 type, int hash) {
+      super(type);
+      this.hash = hash;
+    }
+    
+    @Override
+    public int getSerializingHash() {
+      return hash;
+    }
+  }
+  class TestEventHandler1 extends CountDownEventHandler implements EventHandler<TestEvent1> {
+    @Override
+    public void handle(TestEvent1 event) {
+      handle();
+    }
+  }
+  public enum TestEventType2 { TYPE2 }
+  public class TestEvent2 extends TezAbstractEvent<TestEventType2> {
+    public TestEvent2(TestEventType2 type) {
+      super(type);
+    }
+  }
+  class TestEventHandler2 extends CountDownEventHandler implements EventHandler<TestEvent2> {
+    @Override
+    public void handle(TestEvent2 event) {
+      handle();
+    }
+  }
+  public enum TestEventType3 { TYPE3 }
+  public class TestEvent3 extends TezAbstractEvent<TestEventType3> {
+    public TestEvent3(TestEventType3 type) {
+      super(type);
+    }
+  }
+  class TestEventHandler3 extends CountDownEventHandler implements EventHandler<TestEvent3> {
+    @Override
+    public void handle(TestEvent3 event) {
+      handle();
+    }
+  }
+
+  @Test (timeout=5000)
+  public void testBasic() throws Exception {
+    CountDownLatch latch = new CountDownLatch(4);
+    CountDownEventHandler.init(latch);
+    
+    AsyncDispatcher central = new AsyncDispatcher("Type1");
+    central.register(TestEventType1.class, new TestEventHandler1());
+    central.registerAndCreateDispatcher(TestEventType2.class, new TestEventHandler2(), "Type2", 1);
+    central.registerAndCreateDispatcher(TestEventType3.class, new TestEventHandler3(), "Type3", 1);
+    
+    central.init(new Configuration());
+    central.start();
+    // 3 threads in different dispatchers will handle 3 events
+    central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 0));
+    central.getEventHandler().handle(new TestEvent2(TestEventType2.TYPE2));
+    central.getEventHandler().handle(new TestEvent3(TestEventType3.TYPE3));
+    // wait for all events to be run in parallel
+    CountDownEventHandler.checkParallelCountersDoneAndFinish();
+    central.close();
+  }
+  
+  @Test (timeout=5000)
+  public void testMultiThreads() throws Exception {
+    CountDownLatch latch = new CountDownLatch(4);
+    CountDownEventHandler.init(latch);
+    
+    AsyncDispatcherConcurrent central = new AsyncDispatcherConcurrent("Type1", 1);
+    central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler1(), "Type1", 3);
+    
+    central.init(new Configuration());
+    central.start();
+    // 3 threads in the same dispatcher will handle 3 events
+    central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 0));
+    central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 1));
+    central.getEventHandler().handle(new TestEvent1(TestEventType1.TYPE1, 2));
+    // wait for all events to be run in parallel
+    CountDownEventHandler.checkParallelCountersDoneAndFinish();
+    central.close();
+  }
+  
+  @Test (timeout=5000)
+  public void testMultipleRegisterFail() throws Exception {
+    AsyncDispatcher central = new AsyncDispatcher("Type1");
+    try {
+      central.register(TestEventType1.class, new TestEventHandler1());
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Cannot register same event on multiple dispatchers"));
+    } finally {
+      central.close();
+    }
+    
+    central = new AsyncDispatcher("Type1");
+    try {
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+      central.register(TestEventType1.class, new TestEventHandler1());
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered"));
+    } finally {
+      central.close();
+    }
+    
+    central = new AsyncDispatcher("Type1");
+    try {
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered"));
+    } finally {
+      central.close();
+    }
+    
+    central = new AsyncDispatcher("Type1");
+    try {
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2");
+      central.registerAndCreateDispatcher(TestEventType1.class, new TestEventHandler2(), "Type2");
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Multiple dispatchers cannot be registered for"));
+    } finally {
+      central.close();
+    }
+    
+    central = new AsyncDispatcher("Type1");
+    try {
+      AsyncDispatcherConcurrent concDispatcher = central.registerAndCreateDispatcher(
+          TestEventType1.class, new TestEventHandler2(), "Type2", 1);
+      central.registerWithExistingDispatcher(TestEventType1.class, new TestEventHandler1(),
+          concDispatcher);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Multiple concurrent dispatchers cannot be registered"));
+    } finally {
+      central.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 27b9c37..3e3d6f0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.AsyncDispatcherConcurrent;
 import org.apache.tez.common.GcTimeUpdater;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
@@ -455,12 +456,22 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
-    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-    dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+    if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
+        TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) {
+      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+    } else {
+      int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY, 
+          TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT);
+      AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher(
+          TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency);
+      dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class,
+          new TaskAttemptEventDispatcher(), sharedDispatcher);
+    }
     
     // register other delegating dispatchers
-    dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(), "Speculator");
-
+    dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
+        "Speculator");
 
     if (enableWebUIService()) {
       this.webUIService = new WebUIService(context);

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 3f60a4e..6c85cc2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -128,8 +128,6 @@ public interface TaskAttempt {
    */
   long getFinishTime();
   
-  public Task getTask();
-  
   TaskAttemptState restoreFromEvent(HistoryEvent event);
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
index e148fe8..7e68752 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java
@@ -20,11 +20,11 @@ package org.apache.tez.dag.app.dag.event;
 
 import java.util.concurrent.Callable;
 
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
 
 import com.google.common.util.concurrent.FutureCallback;
 
-public abstract class CallableEvent extends AbstractEvent<CallableEventType> implements
+public abstract class CallableEvent extends TezAbstractEvent<CallableEventType> implements
     Callable<Void> {
   private final FutureCallback<Void> callback;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
index 0571cab..b7cb3a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEvent.java
@@ -18,9 +18,10 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
 
-public class DAGAppMasterEvent extends AbstractEvent<DAGAppMasterEventType> {
+
+public class DAGAppMasterEvent extends TezAbstractEvent<DAGAppMasterEventType> {
 
   public DAGAppMasterEvent(DAGAppMasterEventType type) {
     super(type);

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
index 1ec0222..a0a8a1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEvent.java
@@ -18,14 +18,14 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezDAGID;
 
 /**
  * This class encapsulates job related events.
  *
  */
-public class DAGEvent extends AbstractEvent<DAGEventType> {
+public class DAGEvent extends TezAbstractEvent<DAGEventType> {
 
   private TezDAGID dagId;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
index 16fab8e..3863a2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/SpeculatorEvent.java
@@ -18,10 +18,10 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezVertexID;
 
-public class SpeculatorEvent extends AbstractEvent<SpeculatorEventType> {
+public class SpeculatorEvent extends TezAbstractEvent<SpeculatorEventType> {
   private final TezVertexID vertexId;
   
   public SpeculatorEvent(SpeculatorEventType type, TezVertexID vertexId) {

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
index 56c03e3..63ef70f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEvent.java
@@ -18,14 +18,14 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 /**
  * This class encapsulates task attempt related events.
  *
  */
-public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
+public class TaskAttemptEvent extends TezAbstractEvent<TaskAttemptEventType> {
 
   private TezTaskAttemptID attemptID;
   
@@ -42,4 +42,9 @@ public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
   public TezTaskAttemptID getTaskAttemptID() {
     return attemptID;
   }
+  
+  @Override
+  public int getSerializingHash() {
+    return attemptID.getTaskID().getSerializingHash();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
index c7e5faa..def9ddf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEvent.java
@@ -18,14 +18,14 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezTaskID;
 
 /**
  * this class encapsulates task related events.
  *
  */
-public class TaskEvent extends AbstractEvent<TaskEventType> {
+public class TaskEvent extends TezAbstractEvent<TaskEventType> {
 
   private TezTaskID taskId;
 
@@ -37,4 +37,9 @@ public class TaskEvent extends AbstractEvent<TaskEventType> {
   public TezTaskID getTaskID() {
     return taskId;
   }
+  
+  @Override
+  public int getSerializingHash() {
+    return taskId.getSerializingHash();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
index 9e94eb5..33128e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEvent.java
@@ -18,14 +18,14 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.dag.records.TezVertexID;
 
 /**
  * this class encapsulates vertex related events.
  *
  */
-public class VertexEvent extends AbstractEvent<VertexEventType> {
+public class VertexEvent extends TezAbstractEvent<VertexEventType> {
 
   private TezVertexID vertexId;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f562451..f769565 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1261,7 +1261,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     if (finishTime == 0) {
       setFinishTime();
     }
-    
     entityUpdateTracker.stop();
 
     boolean recoveryError = false;

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1f3e1cf..b1c0acc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -144,6 +144,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   private NodeId containerNodeId;
   private String nodeHttpAddress;
   private String nodeRackName;
+  
+  private final Task task;
+  private final Vertex vertex;
 
   @VisibleForTesting
   TaskAttemptStatus reportedStatus;
@@ -406,7 +409,8 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
-      Resource resource, ContainerContext containerContext, boolean leafVertex) {
+      Resource resource, ContainerContext containerContext, boolean leafVertex,
+      Task task) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -417,6 +421,9 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.clock = clock;
     this.taskHeartbeatHandler = taskHeartbeatHandler;
     this.appContext = appContext;
+    this.task = task;
+    this.vertex = this.task.getVertex();
+
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);
     RackResolver.init(conf);
@@ -649,17 +656,9 @@ public class TaskAttemptImpl implements TaskAttempt,
       readLock.unlock();
     }
   }
-
-  @Override
-  public Task getTask() {
-    return appContext.getCurrentDAG()
-        .getVertex(attemptId.getTaskID().getVertexID())
-        .getTask(attemptId.getTaskID());
-  }
-
+  
   Vertex getVertex() {
-    return appContext.getCurrentDAG()
-        .getVertex(attemptId.getTaskID().getVertexID());
+    return vertex;
   }
 
   @SuppressWarnings("unchecked")
@@ -955,7 +954,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
         YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
         && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
-      String contextStr = "v_" + getTask().getVertex().getName()
+      String contextStr = "v_" + getVertex().getName()
           + "_" + this.attemptId.toString();
       completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
           + "/" + containerNodeId.toString()
@@ -964,7 +963,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           + "/" + this.appContext.getUser();
     }
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
-        attemptId, getTask().getVertex().getName(),
+        attemptId, getVertex().getName(),
         launchTime, containerId, containerNodeId,
         inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
     this.appContext.getHistoryHandler().handle(
@@ -976,7 +975,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     if (getLaunchTime() == 0) return;
 
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
-        attemptId, getTask().getVertex().getName(), getLaunchTime(),
+        attemptId, getVertex().getName(), getLaunchTime(),
         getFinishTime(), TaskAttemptState.SUCCEEDED, null,
         "", getCounters());
     // FIXME how do we store information regd completion events
@@ -987,7 +986,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected void logJobHistoryAttemptUnsuccesfulCompletion(
       TaskAttemptState state) {
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
-        attemptId, getTask().getVertex().getName(), getLaunchTime(),
+        attemptId, getVertex().getName(), getLaunchTime(),
         clock.getTime(), state,
         terminationCause,
         StringUtils.join(

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 461339b..8b63734 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -119,6 +119,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private Map<TezTaskAttemptID, TaskAttempt> attempts;
   private final int maxFailedAttempts;
   protected final Clock clock;
+  private final Vertex vertex;
   private final Lock readLock;
   private final Lock writeLock;
   private final List<String> diagnostics = new ArrayList<String>();
@@ -326,7 +327,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
       boolean leafVertex, Resource resource,
       ContainerContext containerContext,
-      StateChangeNotifier stateChangeNotifier) {
+      StateChangeNotifier stateChangeNotifier,
+      Vertex vertex) {
     this.conf = conf;
     this.clock = clock;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -342,7 +344,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.eventHandler = eventHandler;
     this.appContext = appContext;
     this.stateChangeNotifier = stateChangeNotifier;
-
+    this.vertex = vertex;
     this.leafVertex = leafVertex;
     this.taskResource = resource;
     this.containerContext = containerContext;
@@ -382,7 +384,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   @Override
   public Vertex getVertex() {
-    return appContext.getCurrentDAG().getVertex(taskId.getVertexID());
+    return vertex;
   }
 
   @Override
@@ -778,7 +780,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
-        (failedAttempts > 0), taskResource, containerContext, leafVertex);
+        (failedAttempts > 0), taskResource, containerContext, leafVertex, this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c5de19b..9ed7441 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -109,9 +109,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
@@ -164,7 +162,6 @@ import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventType;
@@ -194,8 +191,7 @@ import org.slf4j.LoggerFactory;
  * The read and write calls use ReadWriteLock for concurrency.
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
-  EventHandler<VertexEvent> {
+public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandler<VertexEvent> {
 
   private static final String LINE_SEPARATOR = System
       .getProperty("line.separator");
@@ -216,6 +212,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   // TODO Metrics
   //private final MRAppMetrics metrics;
   private final AppContext appContext;
+  private final DAG dag;
 
   private boolean lazyTasksCopyNeeded = false;
   // must be a linked map for ordering
@@ -867,6 +864,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     setTaskLocationHints(vertexLocationHint);
 
     this.dagUgi = appContext.getCurrentDAG().getDagUGI();
+    this.dag = appContext.getCurrentDAG();
 
     this.taskResource = DagTypeConverters
         .createResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
@@ -2154,7 +2152,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           this.targetVertices.isEmpty() : true),
         this.taskResource,
         conContext,
-        this.stateChangeNotifier);
+        this.stateChangeNotifier,
+        this);
   }
   
   private void createTasks() {
@@ -4409,7 +4408,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public DAG getDAG() {
-    return appContext.getCurrentDAG();
+    return dag;
   }
 
   private TezDAGID getDAGId() {

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 2a061bc..87ffead 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -136,7 +136,7 @@ public class TestMockDAGAppMaster {
     lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
 
-    DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
+    DAG dag = DAG.create("testLocalResourceSetup").addTaskLocalFiles(lrDAG);
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
     dag.addVertex(vA);
 
@@ -166,7 +166,7 @@ public class TestMockDAGAppMaster {
     MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
     mockLauncher.startScheduling(false);
     // there is only 1 task whose first attempt will be preempted
-    DAG dag = DAG.create("test");
+    DAG dag = DAG.create("testInternalPreemption");
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
     dag.addVertex(vA);
 
@@ -197,7 +197,7 @@ public class TestMockDAGAppMaster {
     MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
     mockLauncher.startScheduling(false);
     mockApp.sendDMEvents = true;
-    DAG dag = DAG.create("test");
+    DAG dag = DAG.create("testBasicEvents");
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
     Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
     Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 2);
@@ -230,21 +230,27 @@ public class TestMockDAGAppMaster {
     List<TezEvent> tEvents = tImpl.getTaskEvents();
     Assert.assertEquals(2, tEvents.size()); // 2 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
-    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
     Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
     Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
-    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
     Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+    int targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
+    int targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
+    // order of vA task completion can change order of events
+    Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2,
+        (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
     vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
     tImpl = (TaskImpl) vImpl.getTask(1);
     tEvents = tImpl.getTaskEvents();
     Assert.assertEquals(2, tEvents.size()); // 2 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
-    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
     Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
     Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
-    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
     Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+    targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
+    targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
+    // order of vA task completion can change order of events
+    Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2,
+        (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
     vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
     tImpl = (TaskImpl) vImpl.getTask(1);
     tEvents = tImpl.getTaskEvents();
@@ -478,7 +484,7 @@ public class TestMockDAGAppMaster {
 
     final String vAName = "A";
     
-    DAG dag = DAG.create("testBasicCounters");
+    DAG dag = DAG.create("testBasicCounterMemory");
     Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10000);
     dag.addVertex(vA);
 
@@ -511,6 +517,30 @@ public class TestMockDAGAppMaster {
     checkMemory(dag.getName(), mockApp);
     tezClient.stop();
   }
+  
+  @Ignore
+  @Test (timeout = 60000)
+  public void testTaskEventsProcessingSpeed() throws Exception {
+    Logger.getRootLogger().setLevel(Level.WARN);
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    tezconf.setBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER, true);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+        null, false, false, 30, 1000);
+    tezClient.start();
+
+    final String vAName = "A";
+    
+    DAG dag = DAG.create("testTaskEventsProcessingSpeed");
+    Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 50000);
+    dag.addVertex(vA);
+
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    mockApp.doSleep = false;
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    DAGStatus status = dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+    tezClient.stop();
+  }
 
   @Ignore
   @Test (timeout = 60000)
@@ -530,7 +560,7 @@ public class TestMockDAGAppMaster {
     ioStats.setItemsProcessed(1);
     TaskStatistics vAStats = new TaskStatistics();
 
-    DAG dag = DAG.create("testBasisStatistics");
+    DAG dag = DAG.create("testBasicStatisticsMemory");
     Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), numTasks);
     for (int i=0; i<numSources; ++i) {
       final String sourceName = i + vAName;
@@ -623,7 +653,7 @@ public class TestMockDAGAppMaster {
     MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
     mockLauncher.startScheduling(false);
 
-    DAG dag = DAG.create("test");
+    DAG dag = DAG.create("testSchedulerErrorHandling");
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
     dag.addVertex(vA);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 2a2df7c..50bb68c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -81,6 +81,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
@@ -672,6 +673,35 @@ public class TestTaskAttempt {
   }
   
   @Test(timeout = 5000)
+  public void testEventSerializingHash() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID1 = TezTaskID.getInstance(vertexID, 1);
+    TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2);
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(taskID1, 0);
+    TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(taskID1, 1);
+    TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(taskID2, 1);
+    
+    TaskAttemptEvent taEventFail11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_FAILED);
+    TaskAttemptEvent taEventKill11 = new TaskAttemptEvent(taID11, TaskAttemptEventType.TA_KILL_REQUEST);
+    TaskAttemptEvent taEventKill12 = new TaskAttemptEvent(taID12, TaskAttemptEventType.TA_KILL_REQUEST);
+    TaskAttemptEvent taEventKill21 = new TaskAttemptEvent(taID21, TaskAttemptEventType.TA_KILL_REQUEST);
+    TaskEvent tEventKill1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_KILLED);
+    TaskEvent tEventFail1 = new TaskEvent(taskID1, TaskEventType.T_ATTEMPT_FAILED);
+    TaskEvent tEventFail2 = new TaskEvent(taskID2, TaskEventType.T_ATTEMPT_FAILED);
+    
+    // all of them should have the same value
+    assertEquals(taEventFail11.getSerializingHash(), taEventKill11.getSerializingHash());
+    assertEquals(taEventKill11.getSerializingHash(), taEventKill12.getSerializingHash());
+    assertEquals(tEventFail1.getSerializingHash(), tEventKill1.getSerializingHash());
+    assertEquals(taEventFail11.getSerializingHash(), tEventKill1.getSerializingHash());
+    assertEquals(taEventKill21.getSerializingHash(), tEventFail2.getSerializingHash());
+    // events from different tasks may not have the same value
+    assertFalse(tEventFail1.getSerializingHash() == tEventFail2.getSerializingHash());
+  }
+  
+  @Test(timeout = 5000)
   public void testSuccess() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
@@ -695,7 +725,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
     when(container.getNodeId()).thenReturn(nid);
@@ -786,7 +816,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
     when(container.getNodeId()).thenReturn(nid);
@@ -881,7 +911,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
     when(container.getNodeId()).thenReturn(nid);
@@ -984,7 +1014,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
     when(container.getNodeId()).thenReturn(nid);
@@ -1084,7 +1114,7 @@ public class TestTaskAttempt {
     Resource resource = Resource.newInstance(1024, 1);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
     when(container.getNodeId()).thenReturn(nid);
@@ -1211,7 +1241,7 @@ public class TestTaskAttempt {
         Resource resource, ContainerContext containerContext, boolean leafVertex) {
       super(taskId, attemptNumber, eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
-          isRescheduled, resource, containerContext, leafVertex);
+          isRescheduled, resource, containerContext, leafVertex, mock(TaskImpl.class));
       this.locationHint = locationHint;
     }
     

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index b8b09d0..d6d874d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -148,7 +148,7 @@ public class TestTaskAttemptRecovery {
             mock(TaskAttemptListener.class), new Configuration(),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
             mockAppContext, false, Resource.newInstance(1, 1),
-            mock(ContainerContext.class), false);
+            mock(ContainerContext.class), false, mockTask);
     taId = ta.getID();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 9509df4..66e6724 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -630,7 +630,7 @@ public class TestTaskImpl {
         ContainerContext containerContext, Vertex vertex) {
       super(vertexId, partition, eventHandler, conf, taskAttemptListener,
           clock, thh, appContext, leafVertex, resource,
-          containerContext, mock(StateChangeNotifier.class));
+          containerContext, mock(StateChangeNotifier.class), vertex);
       this.vertex = vertex;
       this.locationHint = locationHint;
     }
@@ -687,7 +687,7 @@ public class TestTaskImpl {
         TaskLocationHint locationHint, boolean isRescheduled,
         Resource resource, ContainerContext containerContext) {
       super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
-          appContext, isRescheduled, resource, containerContext, false);
+          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class));
       this.locationHint = locationHint;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f6ea0fb3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index e182f24..2a49826 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -190,7 +190,7 @@ public class TestTaskRecovery {
             new Configuration(), mock(TaskAttemptListener.class),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
             mockAppContext, false, Resource.newInstance(1, 1),
-            mock(ContainerContext.class), mock(StateChangeNotifier.class));
+            mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex);
 
     Map<String, OutputCommitter> committers =
         new HashMap<String, OutputCommitter>();