You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:30 UTC

[40/50] [abbrv] tez git commit: TEZ-2707. Fix comments from reviews - part 2. (sseth)

TEZ-2707. Fix comments from reviews - part 2. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c03a6ff5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c03a6ff5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c03a6ff5

Branch: refs/heads/TEZ-2003
Commit: c03a6ff5fc9193909d1087817bdda47a9df7ae47
Parents: 2ecffa7
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Aug 11 16:52:32 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:15:23 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  2 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  4 ++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  6 +-
 .../apache/tez/dag/app/rm/AMSchedulerEvent.java | 10 ++-
 .../rm/AMSchedulerEventDeallocateContainer.java | 10 +--
 .../rm/AMSchedulerEventNodeBlacklistUpdate.java |  8 +--
 .../tez/dag/app/rm/AMSchedulerEventTAEnded.java |  8 +--
 .../app/rm/AMSchedulerEventTALaunchRequest.java |  8 +--
 .../apache/tez/dag/app/rm/node/AMNodeEvent.java | 10 +--
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  | 14 ++---
 .../tez/dag/app/rm/node/AMNodeTracker.java      | 32 +++++-----
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  2 -
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 65 ++++++--------------
 .../tez/dag/app/dag/impl/TestVertexImpl2.java   | 56 ++++++++++++++---
 15 files changed, 114 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index fd3374e..adb800b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -46,5 +46,6 @@ ALL CHANGES:
   TEZ-2698. rebase 08/05
   TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins
   TEZ-2678. Fix comments from reviews - part 1.
+  TEZ-2707. Fix comments from reviews - part 2.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index f88c1de..84b3095 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -2422,7 +2422,7 @@ public class DAGAppMaster extends AbstractService {
 
 
   @VisibleForTesting
-  static void parsePlugin(List<NamedEntityDescriptor> resultList,
+  public static void parsePlugin(List<NamedEntityDescriptor> resultList,
       BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos,
       boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2f6e93c..185193f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -225,6 +225,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             " events: " + (inEvents != null ? inEvents.size() : -1));
       }
 
+      long currTime = context.getClock().getTime();
       List<TezEvent> otherEvents = new ArrayList<TezEvent>();
       // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
       // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
@@ -232,6 +233,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
       //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
       for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+        // for now, set the event time on the AM when it is received.
+        // this avoids any time disparity between machines.
+        tezEvent.setEventReceivedTime(currTime);
         final EventType eventType = tezEvent.getEventType();
         if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
           TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index c6d8a7e..1c4102d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -134,7 +134,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected final AppContext appContext;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private long launchTime = 0;
-  private long scheduleTime = 0;
   private long finishTime = 0;
   private String trackerName;
   private int httpPort;
@@ -440,6 +439,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.vertex = this.task.getVertex();
     this.creationCausalTA = schedulingCausalTA;
     this.creationTime = clock.getTime();
+    this.schedulingCausalTA = schedulingCausalTA;
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);
@@ -703,7 +703,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public long getScheduleTime() {
     readLock.lock();
     try {
-      return scheduleTime;
+      return scheduledTime;
     } finally {
       readLock.unlock();
     }
@@ -1071,7 +1071,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
 
-      ta.scheduleTime = ta.clock.getTime();
+      ta.scheduledTime = ta.clock.getTime();
       // TODO Creating the remote task here may not be required in case of
       // recovery.
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
index af0bed0..dd9d951 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java
@@ -22,8 +22,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 
 public class AMSchedulerEvent extends AbstractEvent<AMSchedulerEventType> {
 
-  // TODO Not a very useful class...
-  public AMSchedulerEvent(AMSchedulerEventType type) {
+  private final int schedulerId;
+
+  public AMSchedulerEvent(AMSchedulerEventType type, int schedulerId) {
     super(type);
+    this.schedulerId = schedulerId;
+  }
+
+  public int getSchedulerId() {
+    return this.schedulerId;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
index 5270aa2..d1ca99e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
@@ -23,20 +23,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
 
   private final ContainerId containerId;
-  private final int schedulerId;
-  
+
   public AMSchedulerEventDeallocateContainer(ContainerId containerId, int schedulerId) {
-    super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+    super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE, schedulerId);
     this.containerId = containerId;
-    this.schedulerId = schedulerId;
   }
   
   public ContainerId getContainerId() {
     return this.containerId;
   }
-
-  public int getSchedulerId() {
-    return schedulerId;
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
index 679705a..d22c0ec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
@@ -23,20 +23,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent {
 
   private final NodeId nodeId;
-  private final int schedulerId;
 
   public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add, int schedulerId) {
     super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED
-        : AMSchedulerEventType.S_NODE_UNBLACKLISTED));
+        : AMSchedulerEventType.S_NODE_UNBLACKLISTED), schedulerId);
     this.nodeId = nodeId;
-    this.schedulerId = schedulerId;
   }
 
   public NodeId getNodeId() {
     return this.nodeId;
   }
-
-  public int getSchedulerId() {
-    return schedulerId;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index ccc5465..f7fee3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -30,17 +30,15 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
   private final TaskAttemptState state;
   private final TaskAttemptEndReason taskAttemptEndReason;
   private final String diagnostics;
-  private final int schedulerId;
 
   public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
       TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, String diagnostics, int schedulerId) {
-    super(AMSchedulerEventType.S_TA_ENDED);
+    super(AMSchedulerEventType.S_TA_ENDED, schedulerId);
     this.attempt = attempt;
     this.containerId = containerId;
     this.state = state;
     this.taskAttemptEndReason = taskAttemptEndReason;
     this.diagnostics = diagnostics;
-    this.schedulerId = schedulerId;
   }
 
   public TezTaskAttemptID getAttemptID() {
@@ -59,10 +57,6 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
     return this.containerId;
   }
 
-  public int getSchedulerId() {
-    return schedulerId;
-  }
-
   public TaskAttemptEndReason getTaskAttemptEndReason() {
     return taskAttemptEndReason;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index c59193c..0424c97 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -38,7 +38,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   private final TaskSpec remoteTaskSpec;
   private final TaskAttempt taskAttempt;
 
-  private final int schedulerId;
   private final int launcherId;
   private final int taskCommId;
 
@@ -48,7 +47,7 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
       TaskLocationHint locationHint, int priority,
       ContainerContext containerContext,
       int schedulerId, int launcherId, int taskCommId) {
-    super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
+    super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST, schedulerId);
     this.attemptId = attemptId;
     this.capability = capability;
     this.remoteTaskSpec = remoteTaskSpec;
@@ -56,7 +55,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
     this.locationHint = locationHint;
     this.priority = priority;
     this.containerContext = containerContext;
-    this.schedulerId = schedulerId;
     this.launcherId = launcherId;
     this.taskCommId = taskCommId;
   }
@@ -89,10 +87,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
     return this.containerContext;
   }
 
-  public int getSchedulerId() {
-    return schedulerId;
-  }
-
   public int getLauncherId() {
     return launcherId;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
index 85bc513..1a975b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java
@@ -24,19 +24,19 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
 
   private final NodeId nodeId;
-  private final int sourceId; // Effectively the schedulerId
+  private final int schedulerId;
 
-  public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) {
+  public AMNodeEvent(NodeId nodeId, int schedulerId, AMNodeEventType type) {
     super(type);
     this.nodeId = nodeId;
-    this.sourceId = sourceId;
+    this.schedulerId = schedulerId;
   }
 
   public NodeId getNodeId() {
     return this.nodeId;
   }
 
-  public int getSourceId() {
-    return sourceId;
+  public int getSchedulerId() {
+    return schedulerId;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 88b36cb1f..18d5978 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -54,7 +54,7 @@ public class AMNodeImpl implements AMNode {
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private final NodeId nodeId;
-  private final int sourceId;
+  private final int schedulerId;
   private final AppContext appContext;
   private final int maxTaskFailuresPerNode;
   private boolean blacklistingEnabled;
@@ -173,14 +173,14 @@ public class AMNodeImpl implements AMNode {
 
 
   @SuppressWarnings("rawtypes")
-  public AMNodeImpl(NodeId nodeId, int sourceId, int maxTaskFailuresPerNode,
+  public AMNodeImpl(NodeId nodeId, int schedulerId, int maxTaskFailuresPerNode,
       EventHandler eventHandler, boolean blacklistingEnabled,
       AppContext appContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
     this.nodeId = nodeId;
-    this.sourceId = sourceId;
+    this.schedulerId = schedulerId;
     this.appContext = appContext;
     this.eventHandler = eventHandler;
     this.blacklistingEnabled = blacklistingEnabled;
@@ -249,7 +249,7 @@ public class AMNodeImpl implements AMNode {
 
   /* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */
   protected boolean registerBadNodeAndShouldBlacklist() {
-    return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, sourceId);
+    return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, schedulerId);
   }
 
   protected void blacklistSelf() {
@@ -259,8 +259,7 @@ public class AMNodeImpl implements AMNode {
     // these containers are not useful anymore
     pastContainers.addAll(containers);
     containers.clear();
-    // TODO TEZ-2124 node tracking per ext source
-    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0));
+    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, schedulerId));
   }
 
   @SuppressWarnings("unchecked")
@@ -366,8 +365,7 @@ public class AMNodeImpl implements AMNode {
     public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
       node.ignoreBlacklisting = ignore;
       if (node.getState() == AMNodeState.BLACKLISTED) {
-        // TODO TEZ-2124 node tracking per ext source
-        node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0));
+        node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, node.schedulerId));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 32e515b..751276e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -82,14 +82,14 @@ public class AMNodeTracker extends AbstractService implements
     }
   }
 
-  public void nodeSeen(NodeId nodeId, int sourceId) {
-    PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(sourceId);
+  public void nodeSeen(NodeId nodeId, int schedulerId) {
+    PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(schedulerId);
     nodeTracker.nodeSeen(nodeId);
   }
 
 
-  boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int sourceId) {
-    return perSourceNodeTrackers.get(sourceId).registerBadNodeAndShouldBlacklist(amNode);
+  boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int schedulerId) {
+    return perSourceNodeTrackers.get(schedulerId).registerBadNodeAndShouldBlacklist(amNode);
   }
 
   public void handle(AMNodeEvent rEvent) {
@@ -101,42 +101,42 @@ public class AMNodeTracker extends AbstractService implements
       case N_IGNORE_BLACKLISTING_ENABLED:
       case N_IGNORE_BLACKLISTING_DISABLED:
         // All of these will only be seen after a node has been registered.
-        perSourceNodeTrackers.get(rEvent.getSourceId()).handle(rEvent);
+        perSourceNodeTrackers.get(rEvent.getSchedulerId()).handle(rEvent);
         break;
       case N_TURNED_UNHEALTHY:
       case N_TURNED_HEALTHY:
       case N_NODE_COUNT_UPDATED:
         // These events can be seen without a node having been marked as 'seen' before
-        getAndCreateIfNeededPerSourceTracker(rEvent.getSourceId()).handle(rEvent);
+        getAndCreateIfNeededPerSourceTracker(rEvent.getSchedulerId()).handle(rEvent);
         break;
     }
   }
 
-  public AMNode get(NodeId nodeId, int sourceId) {
-    return perSourceNodeTrackers.get(sourceId).get(nodeId);
+  public AMNode get(NodeId nodeId, int schedulerId) {
+    return perSourceNodeTrackers.get(schedulerId).get(nodeId);
   }
 
-  public int getNumNodes(int sourceId) {
-    return perSourceNodeTrackers.get(sourceId).getNumNodes();
+  public int getNumNodes(int schedulerId) {
+    return perSourceNodeTrackers.get(schedulerId).getNumNodes();
   }
 
   @Private
   @VisibleForTesting
-  public boolean isBlacklistingIgnored(int sourceId) {
-    return perSourceNodeTrackers.get(sourceId).isBlacklistingIgnored();
+  public boolean isBlacklistingIgnored(int schedulerId) {
+    return perSourceNodeTrackers.get(schedulerId).isBlacklistingIgnored();
   }
 
   public void dagComplete(DAG dag) {
     // TODO TEZ-2337 Maybe reset failures from previous DAGs
   }
 
-  private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int sourceId) {
-    PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(sourceId);
+  private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int schedulerId) {
+    PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(schedulerId);
     if (nodeTracker == null) {
       nodeTracker =
-          new PerSourceNodeTracker(sourceId, eventHandler, appContext, maxTaskFailuresPerNode,
+          new PerSourceNodeTracker(schedulerId, eventHandler, appContext, maxTaskFailuresPerNode,
               nodeBlacklistingEnabled, blacklistDisablePercent);
-      PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(sourceId, nodeTracker);
+      PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(schedulerId, nodeTracker);
       nodeTracker = old != null ? old : nodeTracker;
     }
     return nodeTracker;

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b04b461..fe3e4ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -417,8 +417,6 @@ public class MockDAGAppMaster extends DAGAppMaster {
               events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
                   EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
                   MockDAGAppMaster.this.getContext().getClock().getTime()));
-//              TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
-//                  cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
               TaskHeartbeatRequest request =
                   new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId, cData.nextPreRoutedFromEventId,
                       50000);

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 947ea93..04bb2df 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -21,7 +21,6 @@ package org.apache.tez.dag.app.dag.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -286,10 +285,7 @@ public class TestTaskAttempt {
     TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
 
     MockEventHandler eventHandler = new MockEventHandler();
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -338,10 +334,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -441,10 +434,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = new MockEventHandler();
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -508,10 +498,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -602,10 +589,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -735,10 +719,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -829,10 +810,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -926,10 +904,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1031,10 +1006,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1133,10 +1105,7 @@ public class TestTaskAttempt {
 
     MockEventHandler mockEh = new MockEventHandler();
     MockEventHandler eventHandler = spy(mockEh);
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator taskComm = mock(TaskCommunicator.class);
-    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
-    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1280,11 +1249,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    TaskCommunicator mockTaskComm = mock(TaskCommunicator.class);
-    when(mockTaskComm.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
-    when(taListener.getTaskCommunicator(any(int.class))).thenReturn(mockTaskComm);
+    TaskAttemptListener taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1412,4 +1377,12 @@ public class TestTaskAttempt {
     return new ContainerContext(new HashMap<String, LocalResource>(),
         new Credentials(), new HashMap<String, String>(), "");
   }
+
+  private TaskAttemptListener createMockTaskAttemptListener() {
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
+    return taListener;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 352ad87..0e34f68 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -24,36 +24,42 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
-import org.apache.tez.runtime.api.ExecutionContext;
 import org.junit.Test;
 
 /**
@@ -363,16 +369,46 @@ public class TestVertexImpl2 {
       this.vertexName = "testvertex";
       this.vertexExecutionContext = vertexExecutionContext;
       this.defaultExecutionContext = defaultDagExecitionContext;
-      if (numPlugins == 0) {
-        this.taskSchedulers.put(TezConstants.getTezYarnServicePluginName(), 0);
-        this.containerLaunchers.put(TezConstants.getTezYarnServicePluginName(), 0);
-        this.taskSchedulers.put(TezConstants.getTezYarnServicePluginName(), 0);
-      } else {
+      if (numPlugins == 0) { // Add default container plugins only
+        UserPayload defaultPayload;
+        try {
+          defaultPayload = TezUtils.createUserPayloadFromConf(new Configuration(false));
+        } catch (IOException e) {
+          throw new TezUncheckedException(e);
+        }
+        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers, null,
+            true, false, defaultPayload);
+        DAGAppMaster
+            .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers, null,
+                true, false, defaultPayload);
+        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, null,
+            true, false, defaultPayload);
+      } else { // Add N plugins, no YARN defaults
+        List<TezNamedEntityDescriptorProto> schedulerList = new LinkedList<>();
+        List<TezNamedEntityDescriptorProto> launcherList = new LinkedList<>();
+        List<TezNamedEntityDescriptorProto> taskCommList = new LinkedList<>();
         for (int i = 0; i < numPlugins; i++) {
-          this.taskSchedulers.put(append(TASK_SCHEDULER_NAME_BASE, i), i);
-          this.containerLaunchers.put(append(CONTAINER_LAUNCHER_NAME_BASE, i), i);
-          this.taskComms.put(append(TASK_COMM_NAME_BASE, i), i);
+          schedulerList.add(TezNamedEntityDescriptorProto.newBuilder()
+              .setName(append(TASK_SCHEDULER_NAME_BASE, i)).setEntityDescriptor(
+                  DAGProtos.TezEntityDescriptorProto.newBuilder()
+                      .setClassName(append(TASK_SCHEDULER_NAME_BASE, i))).build());
+          launcherList.add(TezNamedEntityDescriptorProto.newBuilder()
+              .setName(append(CONTAINER_LAUNCHER_NAME_BASE, i)).setEntityDescriptor(
+                  DAGProtos.TezEntityDescriptorProto.newBuilder()
+                      .setClassName(append(CONTAINER_LAUNCHER_NAME_BASE, i))).build());
+          taskCommList.add(
+              TezNamedEntityDescriptorProto.newBuilder().setName(append(TASK_COMM_NAME_BASE, i))
+                  .setEntityDescriptor(
+                      DAGProtos.TezEntityDescriptorProto.newBuilder()
+                          .setClassName(append(TASK_COMM_NAME_BASE, i))).build());
         }
+        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers,
+            schedulerList, false, false, null);
+        DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers,
+            launcherList, false, false, null);
+        DAGAppMaster
+            .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, taskCommList,
+                false, false, null);
       }
 
       this.appContext = createDefaultMockAppContext();