You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:14 UTC

[05/50] [abbrv] tez git commit: TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex] based affinity (bikas)

TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex] based affinity (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/74c7f874
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/74c7f874
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/74c7f874

Branch: refs/heads/TEZ-8
Commit: 74c7f874d558f933fbf9697c537d204eb56806c9
Parents: 06e9f88
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Oct 29 18:46:50 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Oct 29 18:46:50 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++
 .../main/java/org/apache/tez/dag/api/DAG.java   |  7 +-
 .../apache/tez/dag/api/TaskLocationHint.java    | 71 +++++++++++++++-----
 .../tez/dag/api/TestTaskLocationHint.java       | 62 +++++++++++++++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  1 +
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 45 ++++++++++---
 .../app/rm/TestTaskSchedulerEventHandler.java   | 62 ++++++++++++++++-
 .../vertexmanager/InputReadyVertexManager.java  | 11 ++-
 .../TestInputReadyVertexManager.java            | 45 +++++++------
 .../examples/BroadcastAndOneToOneExample.java   |  1 -
 10 files changed, 250 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bc96ce..ca1da2e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,8 @@ INCOMPATIBLE CHANGES
     context.getUserPayload can now return null, apps may need to add defensive code.
   TEZ-1699. Vertex.setParallelism should throw an exception for invalid
   invocations
+  TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex]
+  based affinity
 
 ALL CHANGES:
   TEZ-1620. Wait for application finish before stopping MiniTezCluster
@@ -85,6 +87,8 @@ ALL CHANGES:
     context.getUserPayload can now return null, apps may need to add defensive code.
   TEZ-1699. Vertex.setParallelism should throw an exception for invalid
   invocations
+  TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex]
+  based affinity
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 9b428f0..34c27bf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -719,11 +719,12 @@ public class DAG {
         if (vertexLocationHint.getTaskLocationHints() != null) {
           for (TaskLocationHint hint : vertexLocationHint.getTaskLocationHints()) {
             PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
-
-            if (hint.getAffinitizedContainer() != null) {
+            // we can allow this later on if needed
+            if (hint.getAffinitizedTask() != null) {
               throw new TezUncheckedException(
-                  "Container affinity may not be specified via the DAG API");
+                  "Task based affinity may not be specified via the DAG API");
             }
+
             if (hint.getHosts() != null) {
               taskLocationHintBuilder.addAllHost(hint.getHosts());
             }

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java b/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
index 02f7bf9..926b8fd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
@@ -23,7 +23,7 @@ import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 
 import com.google.common.base.Preconditions;
 
@@ -36,16 +36,40 @@ import com.google.common.base.Preconditions;
 @Evolving
 public class TaskLocationHint {
 
-  // Container to which to affinitize
-  ContainerId containerId;
+  @Unstable
+  /**
+   * Specifies location affinity to the given vertex and given task in that vertex
+   */
+  public class TaskBasedLocationAffinity {
+    private String vertexName;
+    private int taskIndex;
+    public TaskBasedLocationAffinity(String vertexName, int taskIndex) {
+      this.vertexName = vertexName;
+      this.taskIndex = taskIndex;
+    }
+    public String getVertexName() {
+      return vertexName;
+    }
+    public int getTaskIndex() {
+      return taskIndex;
+    }
+    @Override
+    public String toString() {
+      return "[Vertex: " + vertexName + ", TaskIndex: " + taskIndex + "]";
+    }
+  }
+
   // Host names if any to be used
   private Set<String> hosts;
   // Rack names if any to be used
   private Set<String> racks;
+  
+  private TaskBasedLocationAffinity affinitizedTask;
 
-  private TaskLocationHint(ContainerId containerId) {
-    Preconditions.checkNotNull(containerId);
-    this.containerId = containerId;
+  private TaskLocationHint(String vertexName, int taskIndex) {
+    Preconditions.checkNotNull(vertexName);
+    Preconditions.checkArgument(taskIndex >= 0);
+    this.affinitizedTask = new TaskBasedLocationAffinity(vertexName, taskIndex);
   }
 
   private TaskLocationHint(Set<String> hosts, Set<String> racks) {
@@ -62,11 +86,17 @@ public class TaskLocationHint {
   }
 
   /**
-   * Provide a location hint using a container to which the task may be affinitized
-   * Tez will try to run the task on that container or node local to it
+   * Provide a location hint that affinitizes to the given task in the given vertex. Tez will try 
+   * to run in the same container as the given task or node local to it. Locality may degrade to
+   * rack local or further depending on cluster resource allocations.<br>
+   * This is expected to be used only during dynamic optimizations via {@link VertexManagerPlugin}s
+   * and not in while creating the dag using the DAG API.
+   * @param vertexName
+   * @param taskIndex
+   * @return
    */
-  public static TaskLocationHint createTaskLocationHint(ContainerId containerId) {
-    return new TaskLocationHint(containerId);
+  public static TaskLocationHint createTaskLocationHint(String vertexName, int taskIndex) {
+    return new TaskLocationHint(vertexName, taskIndex);
   }
 
   /**
@@ -77,8 +107,8 @@ public class TaskLocationHint {
     return new TaskLocationHint(hosts, racks);
   }
 
-  public ContainerId getAffinitizedContainer() {
-    return containerId;
+  public TaskBasedLocationAffinity getAffinitizedTask() {
+    return affinitizedTask;
   }
 
   public Set<String> getHosts() {
@@ -99,9 +129,9 @@ public class TaskLocationHint {
     result = ( racks != null) ?
         prime * result + racks.hashCode() :
         result + prime;
-    result = ( containerId != null) ?
-        prime * result + containerId.hashCode() :
-        result + prime;
+    if (affinitizedTask != null) {
+      result = prime * result + affinitizedTask.getVertexName().hashCode() + affinitizedTask.getTaskIndex();
+    }
     return result;
   }
 
@@ -131,11 +161,16 @@ public class TaskLocationHint {
     } else if (other.racks != null) {
       return false;
     }
-    if (containerId != null) {
-      if (!containerId.equals(other.containerId)) {
+    if (affinitizedTask != null) {
+      if (other.affinitizedTask == null) {
+        return false;
+      }
+      if (affinitizedTask.getTaskIndex() != other.affinitizedTask.getTaskIndex()) {
+        return false;
+      } else if (!affinitizedTask.getVertexName().equals(other.affinitizedTask.getVertexName())) {
         return false;
       }
-    } else if (other.containerId != null) {
+    } else if (other.affinitizedTask != null){
       return false;
     }
     return true;

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java
new file mode 100644
index 0000000..d52e194
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestTaskLocationHint {
+  
+  @Test (timeout = 5000)
+  public void testEquality() {
+    TaskLocationHint t1 = TaskLocationHint.createTaskLocationHint("v1", 0);
+    TaskLocationHint t2 = TaskLocationHint.createTaskLocationHint("v1", 0);
+    TaskLocationHint t3 = TaskLocationHint.createTaskLocationHint("v2", 0);
+    TaskLocationHint t4 = TaskLocationHint.createTaskLocationHint("v1", 1);
+    TaskLocationHint t5 = TaskLocationHint.createTaskLocationHint(null, null);
+    TaskLocationHint t6 = TaskLocationHint.createTaskLocationHint(null, null);
+    TaskLocationHint t7 = TaskLocationHint.createTaskLocationHint(
+        Sets.newHashSet(new String[] {"n1", "n2"}), Sets.newHashSet(new String[] {"r1", "r2"}));
+    TaskLocationHint t8 = TaskLocationHint.createTaskLocationHint(
+        Sets.newHashSet(new String[] {"n1", "n2"}), Sets.newHashSet(new String[] {"r1", "r2"}));
+    TaskLocationHint t9 = TaskLocationHint.createTaskLocationHint(
+        Sets.newHashSet(new String[] {"n1", "n2"}), Sets.newHashSet(new String[] {"r1"}));
+    TaskLocationHint t10 = TaskLocationHint.createTaskLocationHint(
+        Sets.newHashSet(new String[] {"n1"}), Sets.newHashSet(new String[] {"r1", "r2"}));
+
+    Assert.assertEquals(t1, t2);
+    Assert.assertEquals(t5, t6);
+    Assert.assertEquals(t7, t8);
+    Assert.assertEquals(t2, t1);
+    Assert.assertEquals(t6, t5);
+    Assert.assertEquals(t8, t7);
+    Assert.assertNotEquals(t1, t3);
+    Assert.assertNotEquals(t3, t1);
+    Assert.assertNotEquals(t1, t4);
+    Assert.assertNotEquals(t4, t1);
+    Assert.assertNotEquals(t1, t5);
+    Assert.assertNotEquals(t5, t1);
+    Assert.assertNotEquals(t8, t9);
+    Assert.assertNotEquals(t9, t8);
+    Assert.assertNotEquals(t9, t10);
+    Assert.assertNotEquals(t10, t9);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 43be5aa..45ca543 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -788,6 +788,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         this.launchTime = tEvent.getStartTime();
         recoveryStartEventSeen = true;
         recoveredState = TaskAttemptState.RUNNING;
+        this.containerId = tEvent.getContainerId();
         sendEvent(createDAGCounterUpdateEventTALaunched(this));
         return recoveredState;
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index df66fa0..ec8e73f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -43,12 +43,14 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
@@ -68,6 +70,8 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
 
+import com.google.common.base.Preconditions;
+
 
 public class TaskSchedulerEventHandler extends AbstractService
                                          implements TaskSchedulerAppCallback,
@@ -263,14 +267,29 @@ public class TaskSchedulerEventHandler extends AbstractService
     String hosts[] = null;
     String racks[] = null;
     if (locationHint != null) {
-      if (locationHint.getAffinitizedContainer() != null) {
-        taskScheduler.allocateTask(taskAttempt,
-            event.getCapability(),
-            locationHint.getAffinitizedContainer(),
-            Priority.newInstance(event.getPriority()),
-            event.getContainerContext(),
-            event);
-        return;
+      TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
+      if (taskAffinity != null) {
+        Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
+        Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity 
+            + " for attempt: " + taskAttempt.getID());
+        int taskIndex = taskAffinity.getTaskIndex(); 
+        Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(), 
+            "Invalid taskIndex in task based affinity " + taskAffinity 
+            + " for attempt: " + taskAttempt.getID());
+        TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
+        if (affinityAttempt != null) {
+          Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
+          taskScheduler.allocateTask(taskAttempt,
+              event.getCapability(),
+              affinityAttempt.getAssignedContainerID(),
+              Priority.newInstance(event.getPriority()),
+              event.getContainerContext(),
+              event);
+          return;
+        }
+        LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity 
+            + " but no locality information exists for it. Ignoring hint.");
+        // fall through with null hosts/racks
       } else {
         hosts = (locationHint.getHosts() != null) ? locationHint
             .getHosts().toArray(
@@ -325,6 +344,9 @@ public class TaskSchedulerEventHandler extends AbstractService
 
         while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
           try {
+            if (TaskSchedulerEventHandler.this.eventQueue.peek() == null) {
+              notifyForTest();
+            }
             event = TaskSchedulerEventHandler.this.eventQueue.take();
           } catch (InterruptedException e) {
             if(!stopEventHandling) {
@@ -341,13 +363,18 @@ public class TaskSchedulerEventHandler extends AbstractService
             // Kill the AM.
             sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
             return;
+          } finally {
+            notifyForTest();
           }
         }
       }
     };
     this.eventHandlingThread.start();
   }
-
+  
+  protected void notifyForTest() {
+  }
+  
   @Override
   public void serviceStop() {
     synchronized(this) {

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index edd9ebf..4ec1916 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -18,27 +18,38 @@
 
 package org.apache.tez.dag.app.rm;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.app.dag.impl.TaskImpl;
+import org.apache.tez.dag.app.dag.impl.VertexImpl;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,6 +69,8 @@ public class TestTaskSchedulerEventHandler {
   
   class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler {
 
+    AtomicBoolean notify = new AtomicBoolean(false);
+    
     public MockTaskSchedulerEventHandler(AppContext appContext,
         DAGClientServer clientService, EventHandler eventHandler,
         ContainerSignatureMatcher containerSignatureMatcher) {
@@ -70,6 +83,14 @@ public class TestTaskSchedulerEventHandler {
       return mockTaskScheduler;
     }
     
+    @Override
+    protected void notifyForTest() {
+      synchronized (notify) {
+        notify.set(true);
+        notify.notifyAll();
+      }
+    }
+    
   }
 
   AppContext mockAppContext;
@@ -82,7 +103,7 @@ public class TestTaskSchedulerEventHandler {
 
   @Before
   public void setup() {
-    mockAppContext = mock(AppContext.class);
+    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
     mockClientService = mock(DAGClientServer.class);
     mockEventHandler = new TestEventHandler();
     mockSigMatcher = mock(ContainerSignatureMatcher.class);
@@ -95,6 +116,45 @@ public class TestTaskSchedulerEventHandler {
   }
   
   @Test (timeout = 5000)
+  public void testTaskBasedAffinity() throws Exception {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+
+    TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
+    TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
+    String affVertexName = "srcVertex";
+    int affTaskIndex = 1;
+    TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(affVertexName, affTaskIndex);
+    VertexImpl affVertex = mock(VertexImpl.class);
+    TaskImpl affTask = mock(TaskImpl.class);
+    TaskAttemptImpl affAttempt = mock(TaskAttemptImpl.class);
+    ContainerId affCId = mock(ContainerId.class);
+    when(affVertex.getTotalTasks()).thenReturn(2);
+    when(affVertex.getTask(affTaskIndex)).thenReturn(affTask);
+    when(affTask.getSuccessfulAttempt()).thenReturn(affAttempt);
+    when(affAttempt.getAssignedContainerID()).thenReturn(affCId);
+    when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex);
+    Resource resource = Resource.newInstance(100, 1);
+    AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest
+        (taId, resource, null, mockTaskAttempt, locHint, 3, null);
+    schedulerHandler.notify.set(false);
+    schedulerHandler.handle(event);
+    synchronized (schedulerHandler.notify) {
+      while (!schedulerHandler.notify.get()) {
+        schedulerHandler.notify.wait();
+      }
+    }
+    
+    // verify mockTaskAttempt affinitized to expected affCId
+    verify(mockTaskScheduler, times(1)).allocateTask(mockTaskAttempt, resource, affCId,
+        Priority.newInstance(3), null, event);
+    
+    schedulerHandler.stop();
+    schedulerHandler.close();
+  }
+  
+  @Test (timeout = 5000)
   public void testContainerPreempted() throws IOException {
     Configuration conf = new Configuration(false);
     schedulerHandler.init(conf);

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 6633933..8f30276 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -47,7 +46,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
   boolean taskIsStarted[];
   int oneToOneSrcTasksDoneCount[];
-  Container oneToOneLocationHints[];
+  TaskLocationHint oneToOneLocationHints[];
   int numOneToOneEdges;
 
   public InputReadyVertexManager(VertexManagerPluginContext context) {
@@ -115,7 +114,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
             "Managed task number must equal 1-1 source task number");
       }
       oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
-      oneToOneLocationHints = new Container[oneToOneSrcTaskCount];
+      oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
     }
 
     for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
@@ -149,7 +148,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
         oneToOneSrcTasksDoneCount[taskId.intValue()]++;
         // keep the latest container that completed as the location hint
         // After there is standard data size info available then use it
-        oneToOneLocationHints[taskId.intValue()] = getContext().getTaskContainer(vertex, taskId);
+        oneToOneLocationHints[taskId.intValue()] = TaskLocationHint.createTaskLocationHint(vertex, taskId);
       }
     }
     
@@ -192,11 +191,11 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
           taskIsStarted[i] = true;
           TaskLocationHint locationHint = null;
           if (oneToOneLocationHints[i] != null) {
-            locationHint = TaskLocationHint.createTaskLocationHint(oneToOneLocationHints[i].getId());
+            locationHint = oneToOneLocationHints[i];
           }
           LOG.info("Starting task " + i + " for vertex: "
               + getContext().getVertexName() + " with location: "
-              + ((locationHint != null) ? locationHint.getAffinitizedContainer() : "null"));
+              + ((locationHint != null) ? locationHint.getAffinitizedTask() : "null"));
           tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint));
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index f57bdb0..c6981ed 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -101,21 +101,12 @@ public class TestInputReadyVertexManager {
         InputDescriptor.create("in"));
     
     String mockManagedVertexId = "Vertex";
-    Container mockContainer1 = mock(Container.class);
-    ContainerId mockCId1 = mock(ContainerId.class);
-    when(mockContainer1.getId()).thenReturn(mockCId1);
-    Container mockContainer2 = mock(Container.class);
-    ContainerId mockCId2 = mock(ContainerId.class);
-    when(mockContainer2.getId()).thenReturn(mockCId2);
     
     VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
-    when(mockContext.getTaskContainer(mockSrcVertexId1, 0)).thenReturn(mockContainer1);
-    when(mockContext.getTaskContainer(mockSrcVertexId1, 1)).thenReturn(mockContainer2);
-    when(mockContext.getTaskContainer(mockSrcVertexId1, 2)).thenReturn(mockContainer1);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
     
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
@@ -127,20 +118,26 @@ public class TestInputReadyVertexManager {
     verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
-    Assert.assertEquals(mockCId1, requestCaptor.getValue().get(0)
-        .getTaskLocationHint().getAffinitizedContainer());
+    Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
     verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
-    Assert.assertEquals(mockCId2, requestCaptor.getValue().get(0)
-        .getTaskLocationHint().getAffinitizedContainer());
+    Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
     manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
     verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
-    Assert.assertEquals(mockCId1, requestCaptor.getValue().get(0)
-        .getTaskLocationHint().getAffinitizedContainer());
+    Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
   }
 
   @Test (timeout=5000)
@@ -234,15 +231,19 @@ public class TestInputReadyVertexManager {
     verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
-    Assert.assertEquals(mockCId3, requestCaptor.getValue().get(0)
-        .getTaskLocationHint().getAffinitizedContainer()); // affinity to last completion
+    Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     // 1-1 completion triggers since other 1-1 is done
     manager.onSourceTaskCompleted(mockSrcVertexId3, 1);
     verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
-    Assert.assertEquals(mockCId3, requestCaptor.getValue().get(0)
-        .getTaskLocationHint().getAffinitizedContainer()); // affinity to last completion
+    Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     // 1-1 completion does not trigger since other 1-1 is not done
     manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
     verify(mockContext, times(2)).scheduleVertexTasks(anyList());
@@ -251,8 +252,10 @@ public class TestInputReadyVertexManager {
     verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
-    Assert.assertEquals(mockCId2, requestCaptor.getValue().get(0)
-        .getTaskLocationHint().getAffinitizedContainer()); // affinity to last completion
+    Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     
     // no more starts
     manager.onSourceTaskCompleted(mockSrcVertexId3, 2);

http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 42b31f1..69711d0 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -50,7 +50,6 @@ import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
 import org.apache.tez.runtime.library.output.UnorderedKVOutput;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;