You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/21 00:24:13 UTC

git commit: TEZ-1039. Add Container locality to TaskScheduler (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 2e664e76c -> aa3b77209


TEZ-1039. Add Container locality to TaskScheduler (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/aa3b7720
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/aa3b7720
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/aa3b7720

Branch: refs/heads/master
Commit: aa3b77209dce6d84fd34e2c7434fd92b4853da05
Parents: 2e664e7
Author: Bikas Saha <bi...@apache.org>
Authored: Tue May 20 15:24:00 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue May 20 15:24:00 2014 -0700

----------------------------------------------------------------------
 .../tez/dag/app/rm/LocalTaskScheduler.java      |   9 +-
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 154 +++++++++++++++++--
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  87 +----------
 .../tez/dag/app/rm/TaskSchedulerInterface.java  |   9 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |  12 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       |  92 ++++++++++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   4 +-
 .../examples/BroadcastAndOneToOneExample.java   | 103 ++++++++++---
 .../tez/mapreduce/examples/ExampleDriver.java   |   2 +
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   2 +-
 10 files changed, 345 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
index 7b81407..87bfccd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
@@ -128,11 +128,18 @@ public class LocalTaskScheduler extends AbstractService implements TaskScheduler
   public void allocateTask(Object task, Resource capability, String[] hosts,
       String[] racks, Priority priority, Object containerSignature,
       Object clientCookie) {
-
     taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
   }
 
   @Override
+  public synchronized void allocateTask(Object task, Resource capability,
+      ContainerId containerId, Priority priority, Object containerSignature,
+      Object clientCookie) {
+    // in local mode every task is already container level local
+    taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
+  }
+  
+  @Override
   public boolean deallocateTask(Object task, boolean taskSucceeded) {
     return taskRequestHandler.addDeallocateTaskRequest(task);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 13a71d8..67369ee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -68,6 +68,7 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /* TODO not yet updating cluster nodes on every allocate response
@@ -140,6 +141,8 @@ public class TaskScheduler extends AbstractService
    */
   Map<ContainerId, Object> containerAssignments =
                   new HashMap<ContainerId, Object>();
+  // Remove inUse depending on resolution of TEZ-1129
+  Set<ContainerId> inUseContainers = Sets.newHashSet(); 
   HashMap<ContainerId, Object> releasedContainers =
                   new HashMap<ContainerId, Object>();
   /**
@@ -148,6 +151,8 @@ public class TaskScheduler extends AbstractService
   Map<ContainerId, HeldContainer> heldContainers =
       new HashMap<ContainerId, HeldContainer>();
   
+  Set<Priority> priorityHasAffinity = Sets.newHashSet();
+  
   Set<NodeId> blacklistedNodes = Collections
       .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
   
@@ -199,6 +204,7 @@ public class TaskScheduler extends AbstractService
 
   class CookieContainerRequest extends ContainerRequest {
     CRCookie cookie;
+    ContainerId affinitizedContainerId;
 
     public CookieContainerRequest(
         Resource capability,
@@ -210,9 +216,24 @@ public class TaskScheduler extends AbstractService
       this.cookie = cookie;
     }
 
+    public CookieContainerRequest(
+        Resource capability,
+        ContainerId containerId,
+        String[] hosts,
+        String[] racks,
+        Priority priority,
+        CRCookie cookie) {
+      this(capability, hosts, racks, priority, cookie);
+      this.affinitizedContainerId = containerId;
+    }
+
     CRCookie getCookie() {
       return cookie;
     }
+    
+    ContainerId getAffinitizedContainer() {
+      return affinitizedContainerId;
+    }
   }
 
   public TaskScheduler(TaskSchedulerAppCallback appClient,
@@ -836,6 +857,46 @@ public class TaskScheduler extends AbstractService
     CookieContainerRequest request = new CookieContainerRequest(
       capability, hosts, racks, priority, cookie);
 
+    addRequestAndTrigger(task, request, hosts, racks);
+  }
+  
+  @Override
+  public synchronized void allocateTask(
+      Object task,
+      Resource capability,
+      ContainerId containerId,
+      Priority priority,
+      Object containerSignature,
+      Object clientCookie) {
+
+    HeldContainer heldContainer = heldContainers.get(containerId);
+    String[] hosts = null;
+    String[] racks = null;
+    if (heldContainer != null) {
+      Container container = heldContainer.getContainer();
+      if (canFit(capability, container.getResource())) {
+        // just specify node and use YARN's soft locality constraint for the rest
+        hosts = new String[1];
+        hosts[0] = container.getNodeId().getHost();
+        priorityHasAffinity.add(priority);
+      } else {
+        LOG.warn("Matching requested to container: " + containerId +
+            " but requested capability: " + capability + 
+            " does not fit in container resource: "  + container.getResource());
+      }
+    } else {
+      LOG.warn("Matching requested to unknown container: " + containerId);
+    }
+    
+    CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
+    CookieContainerRequest request = new CookieContainerRequest(
+      capability, containerId, hosts, racks, priority, cookie);
+
+    addRequestAndTrigger(task, request, hosts, racks);
+  }
+  
+  private void addRequestAndTrigger(Object task, CookieContainerRequest request,
+      String[] hosts, String[] racks) {
     addTaskRequest(task, request);
     // See if any of the delayedContainers can be used for this task.
     delayedContainerManager.triggerScheduling(true);
@@ -919,6 +980,18 @@ public class TaskScheduler extends AbstractService
     return null;
   }
 
+  boolean canFit(Resource arg0, Resource arg1) {
+    int mem0 = arg0.getMemory();
+    int mem1 = arg1.getMemory();
+    int cpu0 = arg0.getVirtualCores();
+    int cpu1 = arg1.getVirtualCores();
+    
+    if(mem0 <= mem1 && cpu0 <= cpu1) { 
+      return true;
+    }
+    return false; 
+  }
+  
   void preemptIfNeeded() {
     ContainerId preemptedContainer = null;
     synchronized (this) {
@@ -1084,21 +1157,66 @@ public class TaskScheduler extends AbstractService
 
   private CookieContainerRequest getMatchingRequestWithoutPriority(
       Container container,
-      String location) {
+      String location,
+      boolean considerContainerAffinity) {
     Resource capability = container.getResource();
     List<? extends Collection<CookieContainerRequest>> pRequestsList =
       amRmClient.getMatchingRequestsForTopPriority(location, capability);
+    if (considerContainerAffinity && 
+        !priorityHasAffinity.contains(amRmClient.getTopPriority())) {
+      considerContainerAffinity = false;
+    }
     if (pRequestsList == null || pRequestsList.isEmpty()) {
       return null;
     }
+    CookieContainerRequest firstMatch = null;
     for (Collection<CookieContainerRequest> requests : pRequestsList) {
       for (CookieContainerRequest cookieContainerRequest : requests) {
-        if (canAssignTaskToContainer(cookieContainerRequest, container)) {
-          return cookieContainerRequest;
+        if (firstMatch == null || // we dont have a match. So look for one 
+            // we have a match but are looking for a better container level match.
+            // skip the expensive canAssignTaskToContainer() if the request is 
+            // not affinitized to the container
+            container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
+            ) {
+          if (canAssignTaskToContainer(cookieContainerRequest, container)) {
+            // request matched to container
+            if (!considerContainerAffinity) {
+              return cookieContainerRequest;
+            }
+            ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
+            boolean canMatchTaskWithAffinity = true;
+            if (affCId == null || 
+                !heldContainers.containsKey(affCId) ||
+                inUseContainers.contains(affCId)) {
+              // affinity not specified
+              // affinitized container is no longer held
+              // affinitized container is in use
+              canMatchTaskWithAffinity = false;
+            }
+            if (canMatchTaskWithAffinity) {
+              if (container.getId().equals(
+                  cookieContainerRequest.getAffinitizedContainer())) {
+                // container level match
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Matching with affinity for request: "
+                      + cookieContainerRequest + " container: " + affCId);
+                }
+                return cookieContainerRequest;
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Skipping request for container " + container.getId()
+                    + " due to affinity. Request: " + cookieContainerRequest
+                    + " affContainer: " + affCId);
+              }
+            } else {
+              firstMatch = cookieContainerRequest;
+            }
+          }
         }
       }
     }
-    return null;
+    
+    return firstMatch;
   }
 
   private boolean canAssignTaskToContainer(
@@ -1162,6 +1280,7 @@ public class TaskScheduler extends AbstractService
 
     Container result = taskAllocations.put(task, container);
     assert result == null;
+    inUseContainers.add(container.getId());
     containerAssignments.put(container.getId(), task);
     HeldContainer heldContainer = heldContainers.get(container.getId()); 
     if (!shouldReuseContainers && heldContainer == null) {
@@ -1232,6 +1351,7 @@ public class TaskScheduler extends AbstractService
     if (container == null) {
       return null;
     }
+    inUseContainers.remove(container.getId());
     return container;
   }
 
@@ -1245,6 +1365,7 @@ public class TaskScheduler extends AbstractService
     }
     Container container = taskAllocations.remove(task);
     assert container != null;
+    inUseContainers.remove(containerId);
     if(releaseIfFound) {
       releaseContainer(containerId);
     }
@@ -1438,7 +1559,7 @@ public class TaskScheduler extends AbstractService
         boolean honorLocality) {
       String location = container.getNodeId().getHost();
       CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
-        container, location);
+        container, location, true);
       doBookKeepingForAssignedContainer(assigned, container, location, true);
       return assigned;
 
@@ -1470,7 +1591,7 @@ public class TaskScheduler extends AbstractService
         String location = RackResolver.resolve(container.getNodeId().getHost())
           .getNetworkLocation();
         CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
-            container, location);
+            container, location, false);
         doBookKeepingForAssignedContainer(assigned, container, location,
             honorLocality);
         return assigned;
@@ -1500,7 +1621,7 @@ public class TaskScheduler extends AbstractService
       if (!honorLocality) {
         String location = ResourceRequest.ANY;
         CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
-          container, location);
+          container, location, false);
         doBookKeepingForAssignedContainer(assigned, container, location,
             honorLocality);
         return assigned;
@@ -1533,7 +1654,7 @@ public class TaskScheduler extends AbstractService
     
     // used for testing only
     @VisibleForTesting
-    AtomicBoolean drainedDelayedContainers = null;
+    volatile AtomicBoolean drainedDelayedContainersForTest = null;
 
     DelayedContainerManager() {
       super.setName("DelayedContainerManager");
@@ -1553,10 +1674,11 @@ public class TaskScheduler extends AbstractService
         // locality at this point.
         if (delayedContainers.peek() == null) {
           try {
-            if (drainedDelayedContainers != null) {
-              drainedDelayedContainers.set(true);
-              synchronized (drainedDelayedContainers) {
-                drainedDelayedContainers.notifyAll();
+            // test only signaling to make TestTaskScheduler work
+            if (drainedDelayedContainersForTest != null) {
+              drainedDelayedContainersForTest.set(true);
+              synchronized (drainedDelayedContainersForTest) {
+                drainedDelayedContainersForTest.notifyAll();
               }
             }
             synchronized(this) {
@@ -1568,6 +1690,14 @@ public class TaskScheduler extends AbstractService
             LOG.info("AllocatedContainerManager Thread interrupted");
           }
         } else {
+          // test only sleep to prevent tight loop cycling that makes tests stall
+          if (drainedDelayedContainersForTest != null) {
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
           HeldContainer delayedContainer = delayedContainers.peek();
           if (delayedContainer == null) {
             continue;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index bc1d1b7..865b192 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -205,35 +205,6 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
-    /*MRxTaskAttemptID aId = event.getAttemptID();
-    attemptToLaunchRequestMap.remove(aId);
-    // TODO XXX: This remove may need to be deferred. Possible for a SUCCESSFUL taskAttempt to fail,
-    // which means the scheduler needs to remember taskAttempt to container assignments for a longer time.
-    boolean removed = pendingReduces.remove(aId);
-    if (!removed) {
-      removed = scheduledRequests.remove(aId);
-      if (!removed) {
-        // Maybe assigned.
-        ContainerId containerId = assignedRequests.remove(aId);
-        if (containerId != null) {
-          // Ask the container to stop.
-          sendEvent(new AMContainerEvent(containerId,
-              AMContainerEventType.C_STOP_REQUEST));
-          // Inform the Node - the task has asked to be STOPPED / has already
-          // stopped.
-          sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
-              .get(containerId).getContainer().getNodeId(), containerId,
-              event.getAttemptID(), event.getState() == TaskAttemptState.FAILED));
-        } else {
-          LOG.warn("Received a STOP request for absent taskAttempt: "
-              + event.getAttemptID());
-          // This could be generated in case of recovery, with unhealthy nodes/
-          // fetch failures. Can be ignored, since Recovered containers don't
-          // need to be stopped.
-        }
-      }
-    }*/
-
     TaskAttempt attempt = event.getAttempt();
     boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, false);
     // use stored value of container id in case the scheduler has removed this
@@ -264,23 +235,6 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   private void handleTASucceeded(AMSchedulerEventTAEnded event) {
-    /*
-    // TODO XXX Remember the assigned containerId even after task success.
-    // Required for TOO_MANY_FETCH_FAILURES
-    attemptToLaunchRequestMap.remove(event.getAttemptID());
-    ContainerId containerId = assignedRequests.remove(event.getAttemptID());
-    if (containerId != null) { // TODO Should not be null. Confirm.
-      sendEvent(new AMContainerTASucceededEvent(containerId,
-          event.getAttemptID()));
-      sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap
-          .get(containerId).getContainer().getNodeId(), containerId,
-          event.getAttemptID()));
-      containerAvailable(containerId);
-    } else {
-      LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
-          + event.getAttemptID() + ". Full event: " + event);
-    }*/
-
     TaskAttempt attempt = event.getAttempt();
     ContainerId usedContainerId = event.getUsedContainerId();
 
@@ -302,46 +256,19 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
-    /**
-         // Add to queue of pending tasks.
-    recalculateReduceSchedule = true;
-    attemptToLaunchRequestMap.put(event.getAttemptID(), event);
-    if (event.getAttemptID().getTaskID().getTaskType() == TaskType.MAP) {
-      mapResourceReqt = maybeComputeNormalizedRequestForType(event,
-          TaskType.MAP, mapResourceReqt);
-      event.getCapability().setMemory(mapResourceReqt);
-      scheduledRequests.addMap(event);
-    } else { // Reduce
-      reduceResourceReqt = maybeComputeNormalizedRequestForType(event,
-          TaskType.REDUCE, reduceResourceReqt);
-      event.getCapability().setMemory(reduceResourceReqt);
-      if (event.isRescheduled()) {
-        pendingReduces.addFirst(new ContainerRequestInfo(new ContainerRequest(
-            event.getCapability(), event.getHosts(), event.getRacks(),
-            PRIORITY_REDUCE), event));
-      } else {
-        pendingReduces.addLast(new ContainerRequestInfo(new ContainerRequest(
-            event.getCapability(), event.getHosts(), event.getRacks(),
-            PRIORITY_REDUCE), event));
-      }
-    }
-     */
-    // TODO resource adjustment needs to move into dag
-    /*Resource mapResourceReqt = maybeComputeNormalizedRequestForType(event,
-        TaskType.MAP, mapResourceReqt);
-    event.getCapability().setMemory(mapResourceReqt);*/
     TaskAttempt taskAttempt = event.getTaskAttempt();
     TaskLocationHint locationHint = event.getLocationHint();
     String hosts[] = null;
     String racks[] = null;
-    // Until TEZ-1039 is done we need to translate container affinity to the node
-    // for that container
     if (locationHint != null) {
       if (locationHint.getAffinitizedContainer() != null) {
-        ContainerId containerId = locationHint.getAffinitizedContainer();
-        AMContainer container = appContext.getAllContainers().get(containerId);
-        hosts = new String[1];
-        hosts[0] = container.getContainer().getNodeId().getHost();
+        taskScheduler.allocateTask(taskAttempt,
+            event.getCapability(),
+            locationHint.getAffinitizedContainer(),
+            event.getPriority(),
+            event.getContainerContext(),
+            event);
+        return;
       } else {
         hosts = (locationHint.getDataLocalHosts() != null) ? locationHint
             .getDataLocalHosts().toArray(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
index 5b6f179..1c35492 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
@@ -40,7 +40,14 @@ public interface TaskSchedulerInterface {
   public abstract void allocateTask(Object task, Resource capability,
       String[] hosts, String[] racks, Priority priority,
       Object containerSignature, Object clientCookie);
-
+  
+  /**
+   * Allocate affinitized to a specific container
+   */
+  public abstract void allocateTask(Object task, Resource capability,
+      ContainerId containerId, Priority priority, Object containerSignature,
+      Object clientCookie);
+  
   public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
 
   public abstract Object deallocateContainer(ContainerId containerId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 580062d..2e89b66 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -157,7 +157,7 @@ public class TestContainerReuse {
       taskScheduler.getDrainableAppCallback();
 
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
-    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
     Resource resource = Resource.newInstance(1024, 1);
     Priority priority = Priority.newInstance(5);
@@ -293,7 +293,7 @@ public class TestContainerReuse {
       taskScheduler.getDrainableAppCallback();
     
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
-    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
     Resource resource = Resource.newInstance(1024, 1);
     Priority priority = Priority.newInstance(5);
@@ -389,7 +389,7 @@ public class TestContainerReuse {
         .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
-    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String[] host1 = {"host1"};
@@ -537,7 +537,7 @@ public class TestContainerReuse {
     TaskSchedulerAppCallbackDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
-    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String [] emptyHosts = new String[0];
@@ -661,7 +661,7 @@ public class TestContainerReuse {
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
 
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
-    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String[] host1 = {"host1"};
@@ -775,7 +775,7 @@ public class TestContainerReuse {
         .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
-    taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String[] host1 = {"host1"};

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index bf428ff..9652da4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -547,6 +547,7 @@ public class TestTaskScheduler {
     final Priority mockPriority2 = Priority.newInstance(2);
     final Priority mockPriority3 = Priority.newInstance(3);
     final Priority mockPriority4 = Priority.newInstance(4);
+    final Priority mockPriority5 = Priority.newInstance(5);
     Object mockTask2 = mock(Object.class);
     when(mockTask2.toString()).thenReturn("task2");
     Object mockCookie2 = mock(Object.class);
@@ -708,7 +709,7 @@ public class TestTaskScheduler {
         });
     
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
-    scheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+    scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
     
     scheduler.onContainersAllocated(containers);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -865,6 +866,95 @@ public class TestTaskScheduler {
     scheduler.unblacklistNode(badNodeId);
     verify(mockRMClient, times(1)).removeNodeFromBlacklist(badNodeId);
     assertEquals(0, scheduler.blacklistedNodes.size());
+    
+    // verify container level matching
+    // fudge the top level priority to prevent containers from being released
+    // if top level priority is higher than newly allocated containers then 
+    // they will not be released
+    final AtomicBoolean fudgePriority = new AtomicBoolean(true);
+    when(mockRMClient.getTopPriority()).then(        
+        new Answer<Priority>() {
+          @Override
+          public Priority answer(
+              InvocationOnMock invocation) throws Throwable {
+            if (fudgePriority.get()) {
+              return mockPriority4;
+            }
+            return mockPriority5;
+          }
+        });
+    // add a dummy task to prevent release of allocated containers
+    Object mockTask5 = mock(Object.class);
+    when(mockTask5.toString()).thenReturn("task5");
+    Object mockCookie5 = mock(Object.class);
+    scheduler.allocateTask(mockTask5, mockCapability, hosts,
+        racks, mockPriority5, null, mockCookie5);
+    verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request6 = requestCaptor.getValue();
+    drainableAppCallback.drain();
+    // add containers so that we can reference one of them for container specific
+    // allocation
+    containers.clear();
+    Container mockContainer7 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer7.getNodeId().getHost()).thenReturn("host5");
+    ContainerId mockCId7 = mock(ContainerId.class);
+    when(mockContainer7.toString()).thenReturn("container7");
+    when(mockCId7.toString()).thenReturn("container7");
+    when(mockContainer7.getId()).thenReturn(mockCId7);
+    when(mockContainer7.getPriority()).thenReturn(mockPriority5);
+    containers.add(mockContainer7);
+    Container mockContainer8 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer8.getNodeId().getHost()).thenReturn("host5");
+    ContainerId mockCId8 = mock(ContainerId.class);
+    when(mockContainer8.toString()).thenReturn("container8");
+    when(mockCId8.toString()).thenReturn("container8");
+    when(mockContainer8.getId()).thenReturn(mockCId8);
+    when(mockContainer8.getPriority()).thenReturn(mockPriority5);
+    containers.add(mockContainer8);
+    drainNotifier.set(false);
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());    
+    Object mockTask6 = mock(Object.class);
+    when(mockTask6.toString()).thenReturn("task6");
+    Object mockCookie6 = mock(Object.class);
+    // allocate request with container affinity
+    scheduler.allocateTask(mockTask6, mockCapability, mockCId7, mockPriority5, null, mockCookie6);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request7 = requestCaptor.getValue();
+    hostContainers.clear();
+    hostContainers.add(request6);
+    hostContainers.add(request7);
+    
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(eq("host5"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return hostList;
+          }
+
+        });
+    // stop fudging top priority
+    fudgePriority.set(false);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(mockApp, times(6)).taskAllocated(any(), any(), (Container) any());
+    // container7 allocated to the task with affinity for it
+    verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7);
+    // deallocate allocated task
+    assertTrue(scheduler.deallocateTask(mockTask5, true));
+    assertTrue(scheduler.deallocateTask(mockTask6, true));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(mockCId7);
+    verify(mockApp).containerBeingReleased(mockCId8);
+    verify(mockRMClient).releaseAssignedContainer(mockCId7);
+    verify(mockRMClient).releaseAssignedContainer(mockCId8);
+    verify(mockRMClient, times(7)).releaseAssignedContainer((ContainerId) any());
+    
 
     float progress = 0.5f;
     when(mockApp.getProgress()).thenReturn(progress);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index a9fdaab..5a0856b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -371,8 +371,8 @@ class TestTaskSchedulerHelpers {
 
   static void waitForDelayedDrainNotify(AtomicBoolean drainNotifier)
       throws InterruptedException {
-    while (!drainNotifier.get()) {
-      synchronized (drainNotifier) {
+    synchronized (drainNotifier) {
+      while (!drainNotifier.get()) {
         drainNotifier.wait();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 5a972e8..0cd6aaf 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -19,8 +19,6 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -35,6 +33,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
@@ -47,6 +48,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -59,6 +61,9 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
@@ -78,6 +83,15 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
           .next();
       KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
       kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
+      byte[] userPayload = getContext().getUserPayload();
+      if (userPayload != null) {
+        boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
+        if (doLocalityCheck) {
+          ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry();
+          String entry = String.valueOf(getContext().getTaskIndex());
+          objectRegistry.add(ObjectLifeCycle.DAG, entry, entry);
+        }
+      }
     }
   }
 
@@ -97,27 +111,29 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
       while (inputKvReader.next()) {
         sum += ((IntWritable) inputKvReader.getCurrentValue()).get();
       }
-      System.out.println("Index: " + getContext().getTaskIndex() + " sum: " + sum);
-      int taskIndex = getContext().getTaskIndex();
-      switch (taskIndex) {
-      case 0:
-        Preconditions.checkState((sum == 1), "Sum = " + sum);
-        break;
-      case 1:
-        Preconditions.checkState((sum == 2), "Sum = " + sum);
-        break;
-      case 2:
-        Preconditions.checkState((sum == 3), "Sum = " + sum);
-        break;
-      default:
-        throw new TezUncheckedException("Unexpected taskIndex: " + taskIndex);
+      boolean doLocalityCheck = getContext().getUserPayload()[0] > 0 ? true : false;
+      int broadcastSum = getContext().getUserPayload()[1];
+      int expectedSum = broadcastSum + getContext().getTaskIndex();
+      System.out.println("Index: " + getContext().getTaskIndex() + 
+          " sum: " + sum + " expectedSum: " + expectedSum + " broadcastSum: " + broadcastSum);
+      Preconditions.checkState((sum == expectedSum), "Sum = " + sum);      
+      
+      if (doLocalityCheck) {
+        ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry();
+        String index = (String) objectRegistry.get(String.valueOf(getContext().getTaskIndex()));
+        if (index == null || Integer.valueOf(index).intValue() != getContext().getTaskIndex()) {
+          String msg = "Did not find expected local producer "
+              + getContext().getTaskIndex() + " in the same JVM";
+          System.out.println(msg);
+          throw new TezUncheckedException(msg);
+        }
       }
     }
 
   }
 
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
-      Path stagingDir) throws IOException {
+      Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
     Configuration kvInputConf = new JobConf((Configuration)tezConf);
     kvInputConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
         Text.class.getName());
@@ -134,22 +150,39 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
     MRHelpers.doJobClientMagic(kvInputConf);
     MRHelpers.doJobClientMagic(kvOneToOneConf);
 
+    int numBroadcastTasks = 2;
+    int numOneToOneTasks = 3;
+    if (doLocalityCheck) {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(tezConf);
+      yarnClient.start();
+      int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
+      yarnClient.stop();
+      // create enough 1-1 tasks to run in parallel
+      numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
+      if (numOneToOneTasks < 1) {
+        numOneToOneTasks = 1;
+      }
+    }
+    byte[] procPayload = {(byte) (doLocalityCheck ? 1 : 0), 1};
+
+    System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
+
     byte[] kvInputPayload = MRHelpers.createUserPayloadFromConf(kvInputConf);
     Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
         InputProcessor.class.getName()),
-        2, MRHelpers.getMapResource(kvInputConf));
+        numBroadcastTasks, MRHelpers.getMapResource(kvInputConf));
     broadcastVertex.setJavaOpts(MRHelpers.getMapJavaOpts(kvInputConf));
     
-    int numOneToOneTasks = 3;
     Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
-        InputProcessor.class.getName()),
+        InputProcessor.class.getName()).setUserPayload(procPayload),
         numOneToOneTasks, MRHelpers.getMapResource(kvInputConf));
     inputVertex.setJavaOpts(MRHelpers.getMapJavaOpts(kvInputConf));
-
+    
     byte[] kvOneToOnePayload = MRHelpers.createUserPayloadFromConf(kvOneToOneConf);
     Vertex oneToOneVertex = new Vertex("OneToOne",
         new ProcessorDescriptor(
-            OneToOneProcessor.class.getName()),
+            OneToOneProcessor.class.getName()).setUserPayload(procPayload),
             numOneToOneTasks, MRHelpers.getReduceResource(kvOneToOneConf));
     oneToOneVertex.setJavaOpts(
         MRHelpers.getReduceJavaOpts(kvOneToOneConf)).setVertexManagerPlugin(
@@ -180,7 +213,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
   
   private Credentials credentials = new Credentials();
   
-  public boolean run(Configuration conf) throws Exception {
+  public boolean run(Configuration conf, boolean doLocalityCheck) throws Exception {
     System.out.println("Running BroadcastAndOneToOneExample");
     // conf and UGI
     TezConfiguration tezConf;
@@ -189,6 +222,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
     } else {
       tezConf = new TezConfiguration();
     }
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
     UserGroupInformation.setConfiguration(tezConf);
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
 
@@ -229,7 +263,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
     DAGClient dagClient = null;
 
     try {
-        DAG dag = createDAG(fs, tezConf, stagingDir);
+        DAG dag = createDAG(fs, tezConf, stagingDir, doLocalityCheck);
 
         tezSession.waitTillReady();
         dagClient = tezSession.submitDAG(dag);
@@ -249,9 +283,28 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
   
   @Override
   public int run(String[] args) throws Exception {
-    boolean status = run(getConf());
+    boolean doLocalityCheck = true;
+    if (args.length == 1) {
+      if (args[0].equals(skipLocalityCheck)) {
+        doLocalityCheck = false;
+      } else {
+        printUsage();
+        throw new TezException("Invalid command line");
+      }
+    } else if (args.length > 1) {
+      printUsage();
+      throw new TezException("Invalid command line");
+    }
+    boolean status = run(getConf(), doLocalityCheck);
     return status ? 0 : 1;
   }
+  
+  private static void printUsage() {
+    System.err.println("broadcastAndOneToOneExample " + skipLocalityCheck);
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+  
+  static String skipLocalityCheck = "-skipLocalityCheck";
 
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 2f9cde2..636e83b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -81,6 +81,8 @@ public class ExampleDriver {
           "Word Count with words sorted on frequency");
       pgd.addClass("unionexample", UnionExample.class,
           "Union example");
+      pgd.addClass("broadcastAndOneToOneExample", BroadcastAndOneToOneExample.class,
+          "BroadcastAndOneToOneExample example");
       pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
           "Filters lines by the specified word using broadcast edge");
       pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index b690a11..d5fd04d 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -698,7 +698,7 @@ public class TestMRRJobsDAGApi {
   public void testBroadcastAndOneToOne() throws Exception {
     LOG.info("Running BroadcastAndOneToOne Test");
     BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample();
-    if (job.run(mrrTezCluster.getConfig())) {
+    if (job.run(mrrTezCluster.getConfig(), true)) {
       LOG.info("Success BroadcastAndOneToOne Test");
     } else {
       throw new TezUncheckedException("BroadcastAndOneToOne Test Failed");