You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:35 UTC

[17/50] [abbrv] tez git commit: TEZ-2124. Change Node tracking to work per external container source. (sseth)

TEZ-2124. Change Node tracking to work per external container source. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a5dfca2b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a5dfca2b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a5dfca2b

Branch: refs/heads/TEZ-2003
Commit: a5dfca2bb28b6921a95836630e2a2d02cb9318c2
Parents: 2baf606
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jul 16 14:18:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:44 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   2 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   3 +-
 .../dag/app/launcher/ContainerLauncherImpl.java |   5 +-
 .../app/rm/TaskSchedulerAppCallbackImpl.java    |  89 +++++++++
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  71 +++----
 .../apache/tez/dag/app/rm/node/AMNodeEvent.java |   8 +-
 .../rm/node/AMNodeEventContainerAllocated.java  |   4 +-
 .../rm/node/AMNodeEventNodeCountUpdated.java    |   4 +-
 .../app/rm/node/AMNodeEventStateChanged.java    |   4 +-
 .../rm/node/AMNodeEventTaskAttemptEnded.java    |   4 +-
 .../node/AMNodeEventTaskAttemptSucceeded.java   |   4 +-
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |   6 +-
 .../tez/dag/app/rm/node/AMNodeTracker.java      | 162 +++++-----------
 .../dag/app/rm/node/PerSourceNodeTracker.java   | 187 +++++++++++++++++++
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   2 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |   2 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |  62 +++---
 .../app/rm/TestTaskSchedulerEventHandler.java   |  11 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   2 +-
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  |  64 ++++---
 21 files changed, 462 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 590fe7f..604947c 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -33,5 +33,6 @@ ALL CHANGES:
   TEZ-2508. rebase 06/01
   TEZ-2526. Fix version for tez-history-parser.
   TEZ-2621. rebase 07/14
+  TEZ-2124. Change Node tracking to work per external container source.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index e37fc2f..ec2ef66 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1440,9 +1440,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       VertexImpl v = createVertex(this, vertexName, i);
       addVertex(v);
     }
+
     // check task resources, only check it in non-local mode
     if (!appContext.isLocal()) {
       for (Vertex v : vertexMap.values()) {
+        // TODO TEZ-2003 (post) Ideally, this should be per source.
         if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) {
           String msg = "Vertex's TaskResource is beyond the cluster container capability," +
               "Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource()

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 93b4c3f..1b55295 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1396,7 +1396,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         if (amContainer != null) {
           // inform the node about failure
           task.eventHandler.handle(
-              new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(), 
+              new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(),
+                  task.getVertex().getTaskSchedulerIdentifier(),
                   containerId, failedAttemptId, true));
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index a1eb2a7..a12fb04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -292,7 +293,9 @@ public class ContainerLauncherImpl extends AbstractService implements
 
             // nodes where containers will run at *this* point of time. This is
             // *not* the cluster size and doesn't need to be.
-            int numNodes = context.getNodeTracker().getNumNodes();
+            int yarnSourceIndex =
+                context.getTaskScheduerIdentifier(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+            int numNodes = context.getNodeTracker().getNumNodes(yarnSourceIndex);
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
             if (poolSize < idealPoolSize) {

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
new file mode 100644
index 0000000..ea37e94
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class TaskSchedulerAppCallbackImpl implements TaskSchedulerService.TaskSchedulerAppCallback{
+
+  private final TaskSchedulerEventHandler tseh;
+  private final int schedulerId;
+
+  public TaskSchedulerAppCallbackImpl(TaskSchedulerEventHandler tseh, int schedulerId) {
+    this.tseh = tseh;
+    this.schedulerId = schedulerId;
+  }
+
+  @Override
+  public void taskAllocated(Object task, Object appCookie, Container container) {
+    tseh.taskAllocated(schedulerId, task, appCookie, container);
+  }
+
+  @Override
+  public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
+    tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
+  }
+
+  @Override
+  public void containerBeingReleased(ContainerId containerId) {
+    tseh.containerBeingReleased(schedulerId, containerId);
+  }
+
+  @Override
+  public void nodesUpdated(List<NodeReport> updatedNodes) {
+    tseh.nodesUpdated(schedulerId, updatedNodes);
+  }
+
+  @Override
+  public void appShutdownRequested() {
+    tseh.appShutdownRequested(schedulerId);
+  }
+
+  @Override
+  public void setApplicationRegistrationData(Resource maxContainerCapability,
+                                             Map<ApplicationAccessType, String> appAcls,
+                                             ByteBuffer clientAMSecretKey) {
+    tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    tseh.onError(schedulerId, t);
+  }
+
+  @Override
+  public float getProgress() {
+    return tseh.getProgress(schedulerId);
+  }
+
+  @Override
+  public void preemptContainer(ContainerId containerId) {
+    tseh.preemptContainer(schedulerId, containerId);
+  }
+
+  @Override
+  public AppFinalStatus getFinalAppStatus() {
+    return tseh.getFinalAppStatus();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 69763b3..1ad0059 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -81,8 +82,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import com.google.common.base.Preconditions;
 
 
-public class TaskSchedulerEventHandler extends AbstractService
-                                         implements TaskSchedulerAppCallback,
+public class TaskSchedulerEventHandler extends AbstractService implements
                                                EventHandler<AMSchedulerEvent> {
   static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerEventHandler.class);
 
@@ -315,7 +315,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       // stopped.
       // AMNodeImpl blacklisting logic does not account for KILLED attempts.
       sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
-          get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,
+          get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), attemptContainerId,
           attempt.getID(), event.getState() == TaskAttemptState.FAILED));
     }
   }
@@ -330,7 +330,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       sendEvent(new AMContainerEventTASucceeded(usedContainerId,
           event.getAttemptID()));
       sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
-          get(usedContainerId).getContainer().getNodeId(), usedContainerId,
+          get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
           event.getAttemptID()));
     }
 
@@ -392,14 +392,16 @@ public class TaskSchedulerEventHandler extends AbstractService
   private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
                                                    AppContext appContext,
                                                    String schedulerClassName,
-                                                   long customAppIdIdentifier) {
+                                                   long customAppIdIdentifier,
+                                                   int schedulerId) {
+    TaskSchedulerAppCallback appCallback = new TaskSchedulerAppCallbackImpl(this, schedulerId);
     if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
-      return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+      return new YarnTaskSchedulerService(appCallback, this.containerSignatureMatcher,
           host, port, trackingUrl, appContext);
     } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: Local TaskScheduler");
-      return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
+      return new LocalTaskSchedulerService(appCallback, this.containerSignatureMatcher,
           host, port, trackingUrl, customAppIdIdentifier, appContext);
     } else {
       LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
@@ -411,7 +413,7 @@ public class TaskSchedulerEventHandler extends AbstractService
             .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
                 int.class, String.class, long.class, Configuration.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(this, appContext, host, port, trackingUrl, customAppIdIdentifier,
+        return ctor.newInstance(appCallback, appContext, host, port, trackingUrl, customAppIdIdentifier,
             getConfig());
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
@@ -441,7 +443,7 @@ public class TaskSchedulerEventHandler extends AbstractService
       LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" +
           customAppIdIdentifier);
       taskSchedulers[i] = createTaskScheduler(host, port,
-          trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier);
+          trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i);
     }
   }
 
@@ -521,20 +523,21 @@ public class TaskSchedulerEventHandler extends AbstractService
     }
   }
 
-  // TaskSchedulerAppCallback methods
-  @Override
-  public synchronized void taskAllocated(Object task,
+  // TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context
+
+  // TaskSchedulerAppCallback methods with schedulerId, where relevant
+  public synchronized void taskAllocated(int schedulerId, Object task,
                                            Object appCookie,
                                            Container container) {
     AMSchedulerEventTALaunchRequest event =
         (AMSchedulerEventTALaunchRequest) appCookie;
     ContainerId containerId = container.getId();
     if (appContext.getAllContainers()
-        .addContainerIfNew(container, event.getSchedulerId(), event.getLauncherId(),
+        .addContainerIfNew(container, schedulerId, event.getLauncherId(),
             event.getTaskCommId())) {
-      appContext.getNodeTracker().nodeSeen(container.getNodeId());
+      appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
       sendEvent(new AMNodeEventContainerAllocated(container
-          .getNodeId(), container.getId()));
+          .getNodeId(), schedulerId, container.getId()));
     }
 
 
@@ -554,8 +557,8 @@ public class TaskSchedulerEventHandler extends AbstractService
             .getContainerContext().getCredentials(), event.getPriority()));
   }
 
-  @Override
-  public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
+  public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
+    // SchedulerId isn't used here since no node updates are sent out
     // Inform the Containers about completion.
     AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
     if (amContainer != null) {
@@ -578,8 +581,8 @@ public class TaskSchedulerEventHandler extends AbstractService
     }
   }
 
-  @Override
-  public synchronized void containerBeingReleased(ContainerId containerId) {
+  public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
+    // SchedulerId isn't used here since no node updates are sent out
     AMContainer amContainer = appContext.getAllContainers().get(containerId);
     if (amContainer != null) {
       sendEvent(new AMContainerEventStopRequest(containerId));
@@ -587,28 +590,27 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   @SuppressWarnings("unchecked")
-  @Override
-  public synchronized void nodesUpdated(List<NodeReport> updatedNodes) {
+  public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
     for (NodeReport nr : updatedNodes) {
       // Scheduler will find out from the node, if at all.
       // Relying on the RM to not allocate containers on an unhealthy node.
-      eventHandler.handle(new AMNodeEventStateChanged(nr));
+      eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId));
     }
   }
 
-  @Override
-  public synchronized void appShutdownRequested() {
+  public synchronized void appShutdownRequested(int schedulerId) {
     // This can happen if the RM has been restarted. If it is in that state,
     // this application must clean itself up.
-    LOG.info("App shutdown requested by scheduler");
+    LOG.info("App shutdown requested by scheduler {}", schedulerId);
     sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
   }
 
-  @Override
   public synchronized void setApplicationRegistrationData(
+      int schedulerId,
       Resource maxContainerCapability,
       Map<ApplicationAccessType, String> appAcls, 
       ByteBuffer clientAMSecretKey) {
+    // TODO TEZ-2003 (post) Ideally clusterInfo should be available per source rather than a global view.
     this.appContext.getClusterInfo().setMaxContainerCapability(
         maxContainerCapability);
     this.appAcls = appAcls;
@@ -619,7 +621,6 @@ public class TaskSchedulerEventHandler extends AbstractService
   // TaskScheduler uses a separate thread for it's callbacks. Since this method
   // returns a value which is required, the TaskScheduler wait for the call to
   // complete and can hence lead to a deadlock if called from within a TSEH lock.
-  @Override
   public AppFinalStatus getFinalAppStatus() {
     FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
     StringBuffer sb = new StringBuffer();
@@ -661,24 +662,25 @@ public class TaskSchedulerEventHandler extends AbstractService
   // TaskScheduler uses a separate thread for it's callbacks. Since this method
   // returns a value which is required, the TaskScheduler wait for the call to
   // complete and can hence lead to a deadlock if called from within a TSEH lock.
-  @Override
-  public float getProgress() {
+  public float getProgress(int schedulerId) {
     // at this point allocate has been called and so node count must be available
     // may change after YARN-1722
     // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
     // node updates from the cluster.
+
+    // Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
+
     // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
     int nodeCount = taskSchedulers[0].getClusterNodeCount();
     if (nodeCount != cachedNodeCount) {
       cachedNodeCount = nodeCount;
-      sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount));
+      sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
     }
     return dagAppMaster.getProgress();
   }
 
-  @Override
-  public void onError(Throwable t) {
-    LOG.info("Error reported by scheduler", t);
+  public void onError(int schedulerId, Throwable t) {
+    LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
     sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
   }
 
@@ -693,8 +695,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     // the context has updated information.
   }
 
-  @Override
-  public void preemptContainer(ContainerId containerId) {
+  public void preemptContainer(int schedulerId, ContainerId containerId) {
     // TODO Why is this making a call back into the scheduler, when the call is originating from there.
     // An AMContainer instance should already exist if an attempt is being made to preempt it
     AMContainer amContainer = appContext.getAllContainers().get(containerId);

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
index a623cda..85bc513 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
@@ -24,13 +24,19 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
 
   private final NodeId nodeId;
+  private final int sourceId; // Effectively the schedulerId
 
-  public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+  public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) {
     super(type);
     this.nodeId = nodeId;
+    this.sourceId = sourceId;
   }
 
   public NodeId getNodeId() {
     return this.nodeId;
   }
+
+  public int getSourceId() {
+    return sourceId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
index 0770969..e250f42 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java
@@ -24,8 +24,8 @@ public class AMNodeEventContainerAllocated extends AMNodeEvent {
 
   private final ContainerId containerId;
 
-  public AMNodeEventContainerAllocated(NodeId nodeId, ContainerId containerId) {
-    super(nodeId, AMNodeEventType.N_CONTAINER_ALLOCATED);
+  public AMNodeEventContainerAllocated(NodeId nodeId, int sourceId, ContainerId containerId) {
+    super(nodeId, sourceId, AMNodeEventType.N_CONTAINER_ALLOCATED);
     this.containerId = containerId;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
index 86ca1fc..3b35daf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java
@@ -22,8 +22,8 @@ public class AMNodeEventNodeCountUpdated extends AMNodeEvent {
 
   private final int count;
   
-  public AMNodeEventNodeCountUpdated(int nodeCount) {
-    super(null, AMNodeEventType.N_NODE_COUNT_UPDATED);
+  public AMNodeEventNodeCountUpdated(int nodeCount, int sourceId) {
+    super(null, sourceId, AMNodeEventType.N_NODE_COUNT_UPDATED);
     this.count = nodeCount;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
index ca4e5bd..b371ddd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java
@@ -23,8 +23,8 @@ public class AMNodeEventStateChanged extends AMNodeEvent {
 
   private NodeReport nodeReport;
 
-  public AMNodeEventStateChanged(NodeReport nodeReport) {
-    super(nodeReport.getNodeId(), 
+  public AMNodeEventStateChanged(NodeReport nodeReport, int sourceId) {
+    super(nodeReport.getNodeId(), sourceId,
           (nodeReport.getNodeState().isUnusable() ? 
               AMNodeEventType.N_TURNED_UNHEALTHY :
               AMNodeEventType.N_TURNED_HEALTHY));

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
index c823236..4a4cb61 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java
@@ -27,9 +27,9 @@ public class AMNodeEventTaskAttemptEnded extends AMNodeEvent {
   private final ContainerId containerId;
   private final TezTaskAttemptID taskAttemptId;
   
-  public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId,
+  public AMNodeEventTaskAttemptEnded(NodeId nodeId, int sourceId, ContainerId containerId,
       TezTaskAttemptID taskAttemptId, boolean failed) {
-    super(nodeId, AMNodeEventType.N_TA_ENDED);
+    super(nodeId, sourceId, AMNodeEventType.N_TA_ENDED);
     this.failed = failed;
     this.containerId = containerId;
     this.taskAttemptId = taskAttemptId;

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
index b07d594..2b8cb7d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java
@@ -27,9 +27,9 @@ public class AMNodeEventTaskAttemptSucceeded extends AMNodeEvent {
   private final ContainerId containerId;
   private final TezTaskAttemptID taskAttemptId;
 
-  public AMNodeEventTaskAttemptSucceeded(NodeId nodeId,
+  public AMNodeEventTaskAttemptSucceeded(NodeId nodeId, int sourceId,
       ContainerId containerId, TezTaskAttemptID taskAttemptId) {
-    super(nodeId, AMNodeEventType.N_TA_SUCCEEDED);
+    super(nodeId, sourceId, AMNodeEventType.N_TA_SUCCEEDED);
     this.containerId = containerId;
     this.taskAttemptId = taskAttemptId;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 0d8e4cd..88b36cb1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -54,6 +54,7 @@ public class AMNodeImpl implements AMNode {
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private final NodeId nodeId;
+  private final int sourceId;
   private final AppContext appContext;
   private final int maxTaskFailuresPerNode;
   private boolean blacklistingEnabled;
@@ -172,13 +173,14 @@ public class AMNodeImpl implements AMNode {
 
 
   @SuppressWarnings("rawtypes")
-  public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode,
+  public AMNodeImpl(NodeId nodeId, int sourceId, int maxTaskFailuresPerNode,
       EventHandler eventHandler, boolean blacklistingEnabled,
       AppContext appContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
     this.nodeId = nodeId;
+    this.sourceId = sourceId;
     this.appContext = appContext;
     this.eventHandler = eventHandler;
     this.blacklistingEnabled = blacklistingEnabled;
@@ -247,7 +249,7 @@ public class AMNodeImpl implements AMNode {
 
   /* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */
   protected boolean registerBadNodeAndShouldBlacklist() {
-    return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this);
+    return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, sourceId);
   }
 
   protected void blacklistSelf() {

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 102cbe9..0668ff2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -18,9 +18,8 @@
 
 package org.apache.tez.dag.app.rm.node;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
@@ -29,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -42,23 +40,21 @@ public class AMNodeTracker extends AbstractService implements
   
   static final Logger LOG = LoggerFactory.getLogger(AMNodeTracker.class);
   
-  private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
-  private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+  private final ConcurrentMap<Integer, PerSourceNodeTracker> perSourceNodeTrackers;
+
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private final AppContext appContext;
-  private int numClusterNodes;
-  private boolean ignoreBlacklisting = false;
+
+  // Not final since it's setup in serviceInit
   private int maxTaskFailuresPerNode;
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
-  float currentIgnoreBlacklistingCountThreshold = 0;
-  
+
   @SuppressWarnings("rawtypes")
   public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
     super("AMNodeMap");
-    this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
-    this.blacklistMap = new ConcurrentHashMap<String, Set<NodeId>>();
+    this.perSourceNodeTrackers = new ConcurrentHashMap<>();
     this.eventHandler = eventHandler;
     this.appContext = appContext;
   }
@@ -76,7 +72,7 @@ public class AMNodeTracker extends AbstractService implements
           TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
 
     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
-        ", blacklistingEnabled: " + nodeBlacklistingEnabled + 
+        ", blacklistingEnabled: " + nodeBlacklistingEnabled +
         ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
 
     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
@@ -85,130 +81,66 @@ public class AMNodeTracker extends AbstractService implements
           + ". Should be an integer between 0 and 100 or -1 to disabled");
     }
   }
-  
-  public void nodeSeen(NodeId nodeId) {
-    if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
-        eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
-      LOG.info("Adding new node: " + nodeId);
-    }
-  }
 
-  private void addToBlackList(NodeId nodeId) {
-    String host = nodeId.getHost();
-
-    if (!blacklistMap.containsKey(host)) {
-      blacklistMap.putIfAbsent(host, new HashSet<NodeId>());
-    }
-    Set<NodeId> nodes = blacklistMap.get(host);
-
-    if (!nodes.contains(nodeId)) {
-      nodes.add(nodeId);
-    }
+  public void nodeSeen(NodeId nodeId, int sourceId) {
+    PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(sourceId);
+    nodeTracker.nodeSeen(nodeId);
   }
 
-  boolean registerBadNodeAndShouldBlacklist(AMNode amNode) {
-    if (nodeBlacklistingEnabled) {
-      addToBlackList(amNode.getNodeId());
-      computeIgnoreBlacklisting();
-      return !ignoreBlacklisting;
-    } else {
-      return false;
-    }
+
+  boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int sourceId) {
+    return perSourceNodeTrackers.get(sourceId).registerBadNodeAndShouldBlacklist(amNode);
   }
 
   public void handle(AMNodeEvent rEvent) {
     // No synchronization required until there's multiple dispatchers.
-    NodeId nodeId = rEvent.getNodeId();
     switch (rEvent.getType()) {
-    case N_NODE_COUNT_UPDATED:
-      AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
-      numClusterNodes = event.getNodeCount();
-      LOG.info("Num cluster nodes = " + numClusterNodes);
-      recomputeCurrentIgnoreBlacklistingThreshold();
-      computeIgnoreBlacklisting();
-      break;
-    case N_TURNED_UNHEALTHY:
-    case N_TURNED_HEALTHY:
-      AMNode amNode = nodeMap.get(nodeId);
-      if (amNode == null) {
-        LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
-      } else {
-        amNode.handle(rEvent);
-      }
-      break;
-    default:
-      nodeMap.get(nodeId).handle(rEvent);
+      case N_CONTAINER_ALLOCATED:
+      case N_TA_SUCCEEDED:
+      case N_TA_ENDED:
+      case N_IGNORE_BLACKLISTING_ENABLED:
+      case N_IGNORE_BLACKLISTING_DISABLED:
+        // All of these will only be seen after a node has been registered.
+        perSourceNodeTrackers.get(rEvent.getSourceId()).handle(rEvent);
+        break;
+      case N_TURNED_UNHEALTHY:
+      case N_TURNED_HEALTHY:
+      case N_NODE_COUNT_UPDATED:
+        // These events can be seen without a node having been marked as 'seen' before
+        getAndCreateIfNeededPerSourceTracker(rEvent.getSourceId()).handle(rEvent);
+        break;
     }
   }
 
-  private void recomputeCurrentIgnoreBlacklistingThreshold() {
-    if (nodeBlacklistingEnabled && blacklistDisablePercent != -1) {
-      currentIgnoreBlacklistingCountThreshold =
-          (float) numClusterNodes * blacklistDisablePercent / 100;
-    }
+  public AMNode get(NodeId nodeId, int sourceId) {
+    return perSourceNodeTrackers.get(sourceId).get(nodeId);
   }
 
-  // May be incorrect if there's multiple NodeManagers running on a single host.
-  // knownNodeCount is based on node managers, not hosts. blacklisting is
-  // currently based on hosts.
-  protected void computeIgnoreBlacklisting() {
-
-    boolean stateChanged = false;
-
-    if (!nodeBlacklistingEnabled || blacklistDisablePercent == -1 || blacklistMap.size() == 0) {
-      return;
-    }
-    if (blacklistMap.size() >= currentIgnoreBlacklistingCountThreshold) {
-      if (ignoreBlacklisting == false) {
-        ignoreBlacklisting = true;
-        LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
-            + ", Blacklisted: " + blacklistMap.size());
-        stateChanged = true;
-      }
-    } else {
-      if (ignoreBlacklisting == true) {
-        ignoreBlacklisting = false;
-        LOG.info("Ignore blacklisting set to false. Known: "
-            + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
-        stateChanged = true;
-      }
-    }
-
-    if (stateChanged) {
-      sendIngoreBlacklistingStateToNodes();
-    }
-  }
-
-  private void sendIngoreBlacklistingStateToNodes() {
-    AMNodeEventType eventType =
-        ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
-        : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
-    for (NodeId nodeId : nodeMap.keySet()) {
-      sendEvent(new AMNodeEvent(nodeId, eventType));
-    }
-  }
-
-  public AMNode get(NodeId nodeId) {
-    return nodeMap.get(nodeId);
-  }
-
-  @SuppressWarnings("unchecked")
-  private void sendEvent(Event<?> event) {
-    this.eventHandler.handle(event);
-  }
-
-  public int getNumNodes() {
-    return nodeMap.size();
+  public int getNumNodes(int sourceId) {
+    return perSourceNodeTrackers.get(sourceId).getNumNodes();
   }
 
   @Private
   @VisibleForTesting
-  public boolean isBlacklistingIgnored() {
-    return this.ignoreBlacklisting;
+  public boolean isBlacklistingIgnored(int sourceId) {
+    return perSourceNodeTrackers.get(sourceId).isBlacklistingIgnored();
   }
 
   public void dagComplete(DAG dag) {
     // TODO TEZ-2337 Maybe reset failures from previous DAGs
   }
 
+  private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int sourceId) {
+    PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(sourceId);
+    if (nodeTracker == null) {
+      nodeTracker =
+          new PerSourceNodeTracker(sourceId, eventHandler, appContext, maxTaskFailuresPerNode,
+              nodeBlacklistingEnabled, blacklistDisablePercent);
+      PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(sourceId, nodeTracker);
+      nodeTracker = old != null ? old : nodeTracker;
+    }
+    return nodeTracker;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
new file mode 100644
index 0000000..3264708
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.AppContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerSourceNodeTracker {
+
+  static final Logger LOG = LoggerFactory.getLogger(PerSourceNodeTracker.class);
+
+  private final int sourceId;
+  private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
+  private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+  private final AppContext appContext;
+
+  private final int maxTaskFailuresPerNode;
+  private final boolean nodeBlacklistingEnabled;
+  private final int blacklistDisablePercent;
+
+  private int numClusterNodes;
+  float currentIgnoreBlacklistingCountThreshold = 0;
+  private boolean ignoreBlacklisting = false;
+
+  @SuppressWarnings("rawtypes")
+  public PerSourceNodeTracker(int sourceId, EventHandler eventHandler, AppContext appContext,
+                              int maxTaskFailuresPerNode, boolean nodeBlacklistingEnabled,
+                              int blacklistDisablePercent) {
+    this.sourceId = sourceId;
+    this.nodeMap = new ConcurrentHashMap<>();
+    this.blacklistMap = new ConcurrentHashMap<>();
+    this.eventHandler = eventHandler;
+    this.appContext = appContext;
+
+    this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
+    this.nodeBlacklistingEnabled = nodeBlacklistingEnabled;
+    this.blacklistDisablePercent = blacklistDisablePercent;
+  }
+
+
+
+  public void nodeSeen(NodeId nodeId) {
+    if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, sourceId, maxTaskFailuresPerNode,
+        eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
+      LOG.info("Adding new node {} to nodeTracker {}", nodeId, sourceId);
+    }
+  }
+
+  public AMNode get(NodeId nodeId) {
+    return nodeMap.get(nodeId);
+  }
+
+  public int getNumNodes() {
+    return nodeMap.size();
+  }
+
+  public void handle(AMNodeEvent rEvent) {
+    // No synchronization required until there's multiple dispatchers.
+    NodeId nodeId = rEvent.getNodeId();
+    switch (rEvent.getType()) {
+      case N_NODE_COUNT_UPDATED:
+        AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
+        numClusterNodes = event.getNodeCount();
+        LOG.info("Num cluster nodes = " + numClusterNodes);
+        recomputeCurrentIgnoreBlacklistingThreshold();
+        computeIgnoreBlacklisting();
+        break;
+      case N_TURNED_UNHEALTHY:
+      case N_TURNED_HEALTHY:
+        AMNode amNode = nodeMap.get(nodeId);
+        if (amNode == null) {
+          LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+        } else {
+          amNode.handle(rEvent);
+        }
+        break;
+      default:
+        nodeMap.get(nodeId).handle(rEvent);
+    }
+  }
+
+  boolean registerBadNodeAndShouldBlacklist(AMNode amNode) {
+    if (nodeBlacklistingEnabled) {
+      addToBlackList(amNode.getNodeId());
+      computeIgnoreBlacklisting();
+      return !ignoreBlacklisting;
+    } else {
+      return false;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public boolean isBlacklistingIgnored() {
+    return this.ignoreBlacklisting;
+  }
+
+  private void recomputeCurrentIgnoreBlacklistingThreshold() {
+    if (nodeBlacklistingEnabled && blacklistDisablePercent != -1) {
+      currentIgnoreBlacklistingCountThreshold =
+          (float) numClusterNodes * blacklistDisablePercent / 100;
+    }
+  }
+
+  // May be incorrect if there's multiple NodeManagers running on a single host.
+  // knownNodeCount is based on node managers, not hosts. blacklisting is
+  // currently based on hosts.
+  protected void computeIgnoreBlacklisting() {
+
+    boolean stateChanged = false;
+
+    if (!nodeBlacklistingEnabled || blacklistDisablePercent == -1 || blacklistMap.size() == 0) {
+      return;
+    }
+    if (blacklistMap.size() >= currentIgnoreBlacklistingCountThreshold) {
+      if (ignoreBlacklisting == false) {
+        ignoreBlacklisting = true;
+        LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
+            + ", Blacklisted: " + blacklistMap.size());
+        stateChanged = true;
+      }
+    } else {
+      if (ignoreBlacklisting == true) {
+        ignoreBlacklisting = false;
+        LOG.info("Ignore blacklisting set to false. Known: "
+            + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
+        stateChanged = true;
+      }
+    }
+
+    if (stateChanged) {
+      sendIngoreBlacklistingStateToNodes();
+    }
+  }
+
+  private void addToBlackList(NodeId nodeId) {
+    String host = nodeId.getHost();
+
+    if (!blacklistMap.containsKey(host)) {
+      blacklistMap.putIfAbsent(host, new HashSet<NodeId>());
+    }
+    Set<NodeId> nodes = blacklistMap.get(host);
+
+    if (!nodes.contains(nodeId)) {
+      nodes.add(nodeId);
+    }
+  }
+
+  private void sendIngoreBlacklistingStateToNodes() {
+    AMNodeEventType eventType =
+        ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
+            : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
+    for (NodeId nodeId : nodeMap.keySet()) {
+      sendEvent(new AMNodeEvent(nodeId, sourceId, eventType));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    this.eventHandler.handle(event);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 9882954..0f35bba 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -257,7 +257,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
     
     public void preemptContainer(ContainerData cData) {
-      getTaskSchedulerEventHandler().containerCompleted(null, 
+      getTaskSchedulerEventHandler().containerCompleted(0, null,
           ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED));
       cData.clear();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 42d4b0b..7584b4c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -200,7 +200,7 @@ public class TestMockDAGAppMaster {
     mockLauncher.waitTillContainersLaunched();
     ContainerData cData = mockLauncher.getContainers().values().iterator().next();
     DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
-    mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId);
+    mockApp.getTaskSchedulerEventHandler().preemptContainer(0, cData.cId);
     
     mockLauncher.startScheduling(true);
     dagClient.waitForCompletion();

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 080c20f..62edac9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -216,9 +216,9 @@ public class TestContainerReuse {
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
-      eq(ta11), any(Object.class), eq(containerHost1));
+      eq(0), eq(ta11), any(Object.class), eq(containerHost1));
     verify(taskSchedulerEventHandler).taskAllocated(
-      eq(ta21), any(Object.class), eq(containerHost2));
+      eq(0), eq(ta21), any(Object.class), eq(containerHost2));
 
     // Adding the event later so that task1 assigned to containerHost1
     // is deterministic.
@@ -230,7 +230,7 @@ public class TestContainerReuse {
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(1)).taskAllocated(
-      eq(ta31), any(Object.class), eq(containerHost1));
+      eq(0), eq(ta31), any(Object.class), eq(containerHost1));
     verify(rmClient, times(0)).releaseAssignedContainer(
       eq(containerHost1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -245,7 +245,7 @@ public class TestContainerReuse {
     while (System.currentTimeMillis() < currentTs + 5000l) {
       try {
         verify(taskSchedulerEventHandler,
-          times(1)).containerBeingReleased(eq(containerHost2.getId()));
+          times(1)).containerBeingReleased(eq(0), eq(containerHost2.getId()));
         exception = null;
         break;
       } catch (Throwable e) {
@@ -351,8 +351,8 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), eq(containerHost2));
 
     // Adding the event later so that task1 assigned to containerHost1 is deterministic.
     taskSchedulerEventHandler.handleEvent(lrTa31);
@@ -363,7 +363,7 @@ public class TestContainerReuse {
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
-      eq(ta31), any(Object.class), eq(containerHost2));
+        eq(0), eq(ta31), any(Object.class), eq(containerHost2));
     verify(rmClient, times(1)).releaseAssignedContainer(
       eq(containerHost2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -459,13 +459,13 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
 
     // Task assigned to container completed successfully. Container should be re-used.
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -475,7 +475,7 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -483,7 +483,7 @@ public class TestContainerReuse {
     // Verify no re-use if a previous task fails.
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0));
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container1));
     verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null));
     verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -496,7 +496,7 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2));
 
     // Task assigned to container completed successfully. No pending requests. Container should be released.
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
@@ -606,14 +606,14 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
 
     // First task had profiling on. This container can not be reused further.
     taskSchedulerEventHandler.handleEvent(
         new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class),
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
         eq(container1));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -652,14 +652,14 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container2));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
 
     // Verify that the container can not be reused when profiling option is turned on
     // Even for 2 tasks having same profiling option can have container reusability.
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class),
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
         eq(container2));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -698,13 +698,13 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container3));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), eq(container3));
 
     //Ensure task 6 (of vertex 1) is allocated to same container
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
     eventHandler.reset();
 
     taskScheduler.close();
@@ -804,7 +804,7 @@ public class TestContainerReuse {
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
-      eq(ta11), any(Object.class), eq(container1));
+        eq(0), eq(ta11), any(Object.class), eq(container1));
 
     // Send launch request for task2 (vertex2)
     taskSchedulerEventHandler.handleEvent(lrEvent12);
@@ -818,7 +818,7 @@ public class TestContainerReuse {
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
-      eq(ta12), any(Object.class), eq(container1));
+        eq(0), eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -826,7 +826,7 @@ public class TestContainerReuse {
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(3000l);
     verify(taskSchedulerEventHandler).taskAllocated(
-      eq(ta12), any(Object.class), eq(container1));
+        eq(0), eq(ta12), any(Object.class), eq(container1));
 
     // TA12 completed.
     taskSchedulerEventHandler.handleEvent(
@@ -940,7 +940,7 @@ public class TestContainerReuse {
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(
-      eq(ta11), any(Object.class), eq(container1));
+        eq(0), eq(ta11), any(Object.class), eq(container1));
 
     // Send launch request for task2 (vertex2)
     taskSchedulerEventHandler.handleEvent(lrEvent21);
@@ -953,7 +953,7 @@ public class TestContainerReuse {
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
     verify(taskSchedulerEventHandler).taskAllocated(
-      eq(ta21), any(Object.class), eq(container1));
+        eq(0), eq(ta21), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     // Task 2 completes.
@@ -1063,7 +1063,7 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
     
@@ -1071,7 +1071,7 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1114,7 +1114,7 @@ public class TestContainerReuse {
     // TODO This is terrible, need a better way to ensure the scheduling loop has run
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(6000l);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1124,7 +1124,7 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1237,7 +1237,7 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
 
@@ -1245,7 +1245,7 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1290,7 +1290,7 @@ public class TestContainerReuse {
 
     Thread.sleep(6000l);
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container2));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
     eventHandler.reset();
 
     taskScheduler.close();
@@ -1369,7 +1369,7 @@ public class TestContainerReuse {
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta11),
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11),
         any(Object.class), eq(container1));
     taskScheduler.close();
     taskSchedulerEventHandler.close();

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index daf1db6..005692e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -163,7 +163,7 @@ public class TestTaskSchedulerEventHandler {
     AMSchedulerEventTALaunchRequest lr =
         new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
             priority, containerContext, 0, 0, 0);
-    schedulerHandler.taskAllocated(mockTaskAttempt, lr, container);
+    schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container);
     assertEquals(2, mockEventHandler.events.size());
     assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
     AMContainerEventAssignTA assignEvent =
@@ -227,7 +227,7 @@ public class TestTaskSchedulerEventHandler {
     when(mockStatus.getContainerId()).thenReturn(mockCId);
     when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
     when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED);
-    schedulerHandler.containerCompleted(mockTask, mockStatus);
+    schedulerHandler.containerCompleted(0, mockTask, mockStatus);
     assertEquals(1, mockEventHandler.events.size());
     Event event = mockEventHandler.events.get(0);
     assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
@@ -257,7 +257,7 @@ public class TestTaskSchedulerEventHandler {
     ContainerId mockCId = mock(ContainerId.class);
     verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
     when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer);
-    schedulerHandler.preemptContainer(mockCId);
+    schedulerHandler.preemptContainer(0, mockCId);
     verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
     assertEquals(1, mockEventHandler.events.size());
     Event event = mockEventHandler.events.get(0);
@@ -290,7 +290,7 @@ public class TestTaskSchedulerEventHandler {
     when(mockStatus.getContainerId()).thenReturn(mockCId);
     when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
     when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED);
-    schedulerHandler.containerCompleted(mockTask, mockStatus);
+    schedulerHandler.containerCompleted(0, mockTask, mockStatus);
     assertEquals(1, mockEventHandler.events.size());
     Event event = mockEventHandler.events.get(0);
     assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
@@ -325,7 +325,7 @@ public class TestTaskSchedulerEventHandler {
     // use -104 rather than ContainerExitStatus.KILLED_EXCEEDED_PMEM because
     // ContainerExitStatus.KILLED_EXCEEDED_PMEM is only available after hadoop-2.5
     when(mockStatus.getExitStatus()).thenReturn(-104);
-    schedulerHandler.containerCompleted(mockTask, mockStatus);
+    schedulerHandler.containerCompleted(0, mockTask, mockStatus);
     assertEquals(1, mockEventHandler.events.size());
     Event event = mockEventHandler.events.get(0);
     assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
@@ -383,4 +383,5 @@ public class TestTaskSchedulerEventHandler {
 
   }
 
+  // TODO TEZ-2003. Add tests with multiple schedulers, and ensuring that events go out with correct IDs.
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index ffab769..04610ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -134,7 +134,7 @@ class TestTaskSchedulerHelpers {
 
     @Override
     public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
-      taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(this,
+      taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(new TaskSchedulerAppCallbackImpl(this, 0),
           containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
           appContext);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/a5dfca2b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index d907ea0..84d2e1f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -93,12 +93,12 @@ public class TestAMNodeTracker {
     amNodeTracker.start();
 
     NodeId nodeId = NodeId.newInstance("host1", 2342);
-    amNodeTracker.nodeSeen(nodeId);
+    amNodeTracker.nodeSeen(nodeId, 0);
 
     NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
-    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0));
     dispatcher.await();
-    assertEquals(AMNodeState.UNHEALTHY, amNodeTracker.get(nodeId).getState());
+    assertEquals(AMNodeState.UNHEALTHY, amNodeTracker.get(nodeId, 0).getState());
     amNodeTracker.stop();
   }
 
@@ -114,7 +114,7 @@ public class TestAMNodeTracker {
     NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
 
     NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
-    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0));
     dispatcher.await();
 
     amNodeTracker.stop();
@@ -142,27 +142,27 @@ public class TestAMNodeTracker {
     amNodeTracker.init(conf);
     amNodeTracker.start();
 
-    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1));
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0));
     NodeId nodeId = NodeId.newInstance("host1", 1234);
-    amNodeTracker.nodeSeen(nodeId);
+    amNodeTracker.nodeSeen(nodeId, 0);
 
-    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
 
     ContainerId cId1 = mock(ContainerId.class);
     ContainerId cId2 = mock(ContainerId.class);
 
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
 
     TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
     TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
 
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId1, ta1, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId1, ta1, true));
     dispatcher.await();
     assertEquals(1, node.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node.getState());
 
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
     dispatcher.await();
     assertEquals(2, node.numFailedTAs);
     assertEquals(1, handler.events.size());
@@ -187,44 +187,44 @@ public class TestAMNodeTracker {
     amNodeTracker.init(conf);
     amNodeTracker.start();
 
-    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4));
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, 0));
     NodeId nodeId = NodeId.newInstance("host1", 1234);
     NodeId nodeId2 = NodeId.newInstance("host2", 1234);
     NodeId nodeId3 = NodeId.newInstance("host3", 1234);
     NodeId nodeId4 = NodeId.newInstance("host4", 1234);
-    amNodeTracker.nodeSeen(nodeId);
-    amNodeTracker.nodeSeen(nodeId2);
-    amNodeTracker.nodeSeen(nodeId3);
-    amNodeTracker.nodeSeen(nodeId4);
-    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+    amNodeTracker.nodeSeen(nodeId, 0);
+    amNodeTracker.nodeSeen(nodeId2, 0);
+    amNodeTracker.nodeSeen(nodeId3, 0);
+    amNodeTracker.nodeSeen(nodeId4, 0);
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0);
     
     ContainerId cId1 = mock(ContainerId.class);
     ContainerId cId2 = mock(ContainerId.class);
     ContainerId cId3 = mock(ContainerId.class);
     
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId3));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId3));
     assertEquals(3, node.containers.size());
     
     TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
     TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
     TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
     
-    amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, 0, cId1, ta1));
     assertEquals(1, node.numSuccessfulTAs);
     
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
     assertEquals(1, node.numSuccessfulTAs);
     assertEquals(1, node.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node.getState());
     // duplicate should not affect anything
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true));
     assertEquals(1, node.numSuccessfulTAs);
     assertEquals(1, node.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node.getState());
     
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId3, ta3, true));
     dispatcher.await();
     assertEquals(1, node.numSuccessfulTAs);
     assertEquals(2, node.numFailedTAs);
@@ -246,20 +246,20 @@ public class TestAMNodeTracker {
     ContainerId cId5 = mock(ContainerId.class);
     TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
     TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
-    AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2);
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId4));
-    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId5));
+    AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, 0);
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId4));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId5));
     
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId4, ta4, true));
     assertEquals(1, node2.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node2.getState());
     
     handler.events.clear();
-    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true));
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId5, ta5, true));
     dispatcher.await();
     assertEquals(2, node2.numFailedTAs);
     assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
-    AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3);
+    AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, 0);
     assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
     assertEquals(5, handler.events.size());
 
@@ -286,7 +286,7 @@ public class TestAMNodeTracker {
     // Increase the number of nodes. BLACKLISTING should be re-enabled.
     // Node 1 and Node 2 should go into BLACKLISTED state
     handler.events.clear();
-    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8));
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, 0));
     dispatcher.await();
     LOG.info(("Completed waiting for dispatcher to process all pending events"));
     assertEquals(AMNodeState.BLACKLISTED, node.getState());
@@ -336,4 +336,6 @@ public class TestAMNodeTracker {
     doReturn(healthReportTime).when(nodeReport).getLastHealthReportTime();
     return nodeReport;
   }
+
+  // TODO TEZ-2003. Add tests for multiple sources.
 }