You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:14 UTC
[05/50] [abbrv] tez git commit: TEZ-1700. Replace containerId from
TaskLocationHint with [TaskIndex+Vertex] based affinity (bikas)
TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex] based affinity (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/74c7f874
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/74c7f874
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/74c7f874
Branch: refs/heads/TEZ-8
Commit: 74c7f874d558f933fbf9697c537d204eb56806c9
Parents: 06e9f88
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Oct 29 18:46:50 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Oct 29 18:46:50 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 4 ++
.../main/java/org/apache/tez/dag/api/DAG.java | 7 +-
.../apache/tez/dag/api/TaskLocationHint.java | 71 +++++++++++++++-----
.../tez/dag/api/TestTaskLocationHint.java | 62 +++++++++++++++++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 1 +
.../dag/app/rm/TaskSchedulerEventHandler.java | 45 ++++++++++---
.../app/rm/TestTaskSchedulerEventHandler.java | 62 ++++++++++++++++-
.../vertexmanager/InputReadyVertexManager.java | 11 ++-
.../TestInputReadyVertexManager.java | 45 +++++++------
.../examples/BroadcastAndOneToOneExample.java | 1 -
10 files changed, 250 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bc96ce..ca1da2e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,8 @@ INCOMPATIBLE CHANGES
context.getUserPayload can now return null, apps may need to add defensive code.
TEZ-1699. Vertex.setParallelism should throw an exception for invalid
invocations
+ TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex]
+ based affinity
ALL CHANGES:
TEZ-1620. Wait for application finish before stopping MiniTezCluster
@@ -85,6 +87,8 @@ ALL CHANGES:
context.getUserPayload can now return null, apps may need to add defensive code.
TEZ-1699. Vertex.setParallelism should throw an exception for invalid
invocations
+ TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex]
+ based affinity
Release 0.5.1: 2014-10-02
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 9b428f0..34c27bf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -719,11 +719,12 @@ public class DAG {
if (vertexLocationHint.getTaskLocationHints() != null) {
for (TaskLocationHint hint : vertexLocationHint.getTaskLocationHints()) {
PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
-
- if (hint.getAffinitizedContainer() != null) {
+ // we can allow this later on if needed
+ if (hint.getAffinitizedTask() != null) {
throw new TezUncheckedException(
- "Container affinity may not be specified via the DAG API");
+ "Task based affinity may not be specified via the DAG API");
}
+
if (hint.getHosts() != null) {
taskLocationHintBuilder.addAllHost(hint.getHosts());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java b/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
index 02f7bf9..926b8fd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TaskLocationHint.java
@@ -23,7 +23,7 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import com.google.common.base.Preconditions;
@@ -36,16 +36,40 @@ import com.google.common.base.Preconditions;
@Evolving
public class TaskLocationHint {
- // Container to which to affinitize
- ContainerId containerId;
+ @Unstable
+ /**
+ * Specifies location affinity to the given vertex and given task in that vertex
+ */
+ public class TaskBasedLocationAffinity {
+ private String vertexName;
+ private int taskIndex;
+ public TaskBasedLocationAffinity(String vertexName, int taskIndex) {
+ this.vertexName = vertexName;
+ this.taskIndex = taskIndex;
+ }
+ public String getVertexName() {
+ return vertexName;
+ }
+ public int getTaskIndex() {
+ return taskIndex;
+ }
+ @Override
+ public String toString() {
+ return "[Vertex: " + vertexName + ", TaskIndex: " + taskIndex + "]";
+ }
+ }
+
// Host names if any to be used
private Set<String> hosts;
// Rack names if any to be used
private Set<String> racks;
+
+ private TaskBasedLocationAffinity affinitizedTask;
- private TaskLocationHint(ContainerId containerId) {
- Preconditions.checkNotNull(containerId);
- this.containerId = containerId;
+ private TaskLocationHint(String vertexName, int taskIndex) {
+ Preconditions.checkNotNull(vertexName);
+ Preconditions.checkArgument(taskIndex >= 0);
+ this.affinitizedTask = new TaskBasedLocationAffinity(vertexName, taskIndex);
}
private TaskLocationHint(Set<String> hosts, Set<String> racks) {
@@ -62,11 +86,17 @@ public class TaskLocationHint {
}
/**
- * Provide a location hint using a container to which the task may be affinitized
- * Tez will try to run the task on that container or node local to it
+ * Provide a location hint that affinitizes to the given task in the given vertex. Tez will try
+ * to run in the same container as the given task or node local to it. Locality may degrade to
+ * rack local or further depending on cluster resource allocations.<br>
+ * This is expected to be used only during dynamic optimizations via {@link VertexManagerPlugin}s
+ * and not in while creating the dag using the DAG API.
+ * @param vertexName
+ * @param taskIndex
+ * @return
*/
- public static TaskLocationHint createTaskLocationHint(ContainerId containerId) {
- return new TaskLocationHint(containerId);
+ public static TaskLocationHint createTaskLocationHint(String vertexName, int taskIndex) {
+ return new TaskLocationHint(vertexName, taskIndex);
}
/**
@@ -77,8 +107,8 @@ public class TaskLocationHint {
return new TaskLocationHint(hosts, racks);
}
- public ContainerId getAffinitizedContainer() {
- return containerId;
+ public TaskBasedLocationAffinity getAffinitizedTask() {
+ return affinitizedTask;
}
public Set<String> getHosts() {
@@ -99,9 +129,9 @@ public class TaskLocationHint {
result = ( racks != null) ?
prime * result + racks.hashCode() :
result + prime;
- result = ( containerId != null) ?
- prime * result + containerId.hashCode() :
- result + prime;
+ if (affinitizedTask != null) {
+ result = prime * result + affinitizedTask.getVertexName().hashCode() + affinitizedTask.getTaskIndex();
+ }
return result;
}
@@ -131,11 +161,16 @@ public class TaskLocationHint {
} else if (other.racks != null) {
return false;
}
- if (containerId != null) {
- if (!containerId.equals(other.containerId)) {
+ if (affinitizedTask != null) {
+ if (other.affinitizedTask == null) {
+ return false;
+ }
+ if (affinitizedTask.getTaskIndex() != other.affinitizedTask.getTaskIndex()) {
+ return false;
+ } else if (!affinitizedTask.getVertexName().equals(other.affinitizedTask.getVertexName())) {
return false;
}
- } else if (other.containerId != null) {
+ } else if (other.affinitizedTask != null){
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java
new file mode 100644
index 0000000..d52e194
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestTaskLocationHint.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestTaskLocationHint {
+
+ @Test (timeout = 5000)
+ public void testEquality() {
+ TaskLocationHint t1 = TaskLocationHint.createTaskLocationHint("v1", 0);
+ TaskLocationHint t2 = TaskLocationHint.createTaskLocationHint("v1", 0);
+ TaskLocationHint t3 = TaskLocationHint.createTaskLocationHint("v2", 0);
+ TaskLocationHint t4 = TaskLocationHint.createTaskLocationHint("v1", 1);
+ TaskLocationHint t5 = TaskLocationHint.createTaskLocationHint(null, null);
+ TaskLocationHint t6 = TaskLocationHint.createTaskLocationHint(null, null);
+ TaskLocationHint t7 = TaskLocationHint.createTaskLocationHint(
+ Sets.newHashSet(new String[] {"n1", "n2"}), Sets.newHashSet(new String[] {"r1", "r2"}));
+ TaskLocationHint t8 = TaskLocationHint.createTaskLocationHint(
+ Sets.newHashSet(new String[] {"n1", "n2"}), Sets.newHashSet(new String[] {"r1", "r2"}));
+ TaskLocationHint t9 = TaskLocationHint.createTaskLocationHint(
+ Sets.newHashSet(new String[] {"n1", "n2"}), Sets.newHashSet(new String[] {"r1"}));
+ TaskLocationHint t10 = TaskLocationHint.createTaskLocationHint(
+ Sets.newHashSet(new String[] {"n1"}), Sets.newHashSet(new String[] {"r1", "r2"}));
+
+ Assert.assertEquals(t1, t2);
+ Assert.assertEquals(t5, t6);
+ Assert.assertEquals(t7, t8);
+ Assert.assertEquals(t2, t1);
+ Assert.assertEquals(t6, t5);
+ Assert.assertEquals(t8, t7);
+ Assert.assertNotEquals(t1, t3);
+ Assert.assertNotEquals(t3, t1);
+ Assert.assertNotEquals(t1, t4);
+ Assert.assertNotEquals(t4, t1);
+ Assert.assertNotEquals(t1, t5);
+ Assert.assertNotEquals(t5, t1);
+ Assert.assertNotEquals(t8, t9);
+ Assert.assertNotEquals(t9, t8);
+ Assert.assertNotEquals(t9, t10);
+ Assert.assertNotEquals(t10, t9);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 43be5aa..45ca543 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -788,6 +788,7 @@ public class TaskAttemptImpl implements TaskAttempt,
this.launchTime = tEvent.getStartTime();
recoveryStartEventSeen = true;
recoveredState = TaskAttemptState.RUNNING;
+ this.containerId = tEvent.getContainerId();
sendEvent(createDAGCounterUpdateEventTALaunched(this));
return recoveredState;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index df66fa0..ec8e73f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -43,12 +43,14 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
@@ -68,6 +70,8 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import com.google.common.base.Preconditions;
+
public class TaskSchedulerEventHandler extends AbstractService
implements TaskSchedulerAppCallback,
@@ -263,14 +267,29 @@ public class TaskSchedulerEventHandler extends AbstractService
String hosts[] = null;
String racks[] = null;
if (locationHint != null) {
- if (locationHint.getAffinitizedContainer() != null) {
- taskScheduler.allocateTask(taskAttempt,
- event.getCapability(),
- locationHint.getAffinitizedContainer(),
- Priority.newInstance(event.getPriority()),
- event.getContainerContext(),
- event);
- return;
+ TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
+ if (taskAffinity != null) {
+ Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
+ Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity
+ + " for attempt: " + taskAttempt.getID());
+ int taskIndex = taskAffinity.getTaskIndex();
+ Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(),
+ "Invalid taskIndex in task based affinity " + taskAffinity
+ + " for attempt: " + taskAttempt.getID());
+ TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
+ if (affinityAttempt != null) {
+ Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
+ taskScheduler.allocateTask(taskAttempt,
+ event.getCapability(),
+ affinityAttempt.getAssignedContainerID(),
+ Priority.newInstance(event.getPriority()),
+ event.getContainerContext(),
+ event);
+ return;
+ }
+ LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity
+ + " but no locality information exists for it. Ignoring hint.");
+ // fall through with null hosts/racks
} else {
hosts = (locationHint.getHosts() != null) ? locationHint
.getHosts().toArray(
@@ -325,6 +344,9 @@ public class TaskSchedulerEventHandler extends AbstractService
while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
try {
+ if (TaskSchedulerEventHandler.this.eventQueue.peek() == null) {
+ notifyForTest();
+ }
event = TaskSchedulerEventHandler.this.eventQueue.take();
} catch (InterruptedException e) {
if(!stopEventHandling) {
@@ -341,13 +363,18 @@ public class TaskSchedulerEventHandler extends AbstractService
// Kill the AM.
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
return;
+ } finally {
+ notifyForTest();
}
}
}
};
this.eventHandlingThread.start();
}
-
+
+ protected void notifyForTest() {
+ }
+
@Override
public void serviceStop() {
synchronized(this) {
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index edd9ebf..4ec1916 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -18,27 +18,38 @@
package org.apache.tez.dag.app.rm;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.app.dag.impl.TaskImpl;
+import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -58,6 +69,8 @@ public class TestTaskSchedulerEventHandler {
class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler {
+ AtomicBoolean notify = new AtomicBoolean(false);
+
public MockTaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher) {
@@ -70,6 +83,14 @@ public class TestTaskSchedulerEventHandler {
return mockTaskScheduler;
}
+ @Override
+ protected void notifyForTest() {
+ synchronized (notify) {
+ notify.set(true);
+ notify.notifyAll();
+ }
+ }
+
}
AppContext mockAppContext;
@@ -82,7 +103,7 @@ public class TestTaskSchedulerEventHandler {
@Before
public void setup() {
- mockAppContext = mock(AppContext.class);
+ mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
mockClientService = mock(DAGClientServer.class);
mockEventHandler = new TestEventHandler();
mockSigMatcher = mock(ContainerSignatureMatcher.class);
@@ -95,6 +116,45 @@ public class TestTaskSchedulerEventHandler {
}
@Test (timeout = 5000)
+ public void testTaskBasedAffinity() throws Exception {
+ Configuration conf = new Configuration(false);
+ schedulerHandler.init(conf);
+ schedulerHandler.start();
+
+ TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
+ TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
+ String affVertexName = "srcVertex";
+ int affTaskIndex = 1;
+ TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(affVertexName, affTaskIndex);
+ VertexImpl affVertex = mock(VertexImpl.class);
+ TaskImpl affTask = mock(TaskImpl.class);
+ TaskAttemptImpl affAttempt = mock(TaskAttemptImpl.class);
+ ContainerId affCId = mock(ContainerId.class);
+ when(affVertex.getTotalTasks()).thenReturn(2);
+ when(affVertex.getTask(affTaskIndex)).thenReturn(affTask);
+ when(affTask.getSuccessfulAttempt()).thenReturn(affAttempt);
+ when(affAttempt.getAssignedContainerID()).thenReturn(affCId);
+ when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex);
+ Resource resource = Resource.newInstance(100, 1);
+ AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest
+ (taId, resource, null, mockTaskAttempt, locHint, 3, null);
+ schedulerHandler.notify.set(false);
+ schedulerHandler.handle(event);
+ synchronized (schedulerHandler.notify) {
+ while (!schedulerHandler.notify.get()) {
+ schedulerHandler.notify.wait();
+ }
+ }
+
+ // verify mockTaskAttempt affinitized to expected affCId
+ verify(mockTaskScheduler, times(1)).allocateTask(mockTaskAttempt, resource, affCId,
+ Priority.newInstance(3), null, event);
+
+ schedulerHandler.stop();
+ schedulerHandler.close();
+ }
+
+ @Test (timeout = 5000)
public void testContainerPreempted() throws IOException {
Configuration conf = new Configuration(false);
schedulerHandler.init(conf);
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 6633933..8f30276 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
@@ -47,7 +46,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
boolean taskIsStarted[];
int oneToOneSrcTasksDoneCount[];
- Container oneToOneLocationHints[];
+ TaskLocationHint oneToOneLocationHints[];
int numOneToOneEdges;
public InputReadyVertexManager(VertexManagerPluginContext context) {
@@ -115,7 +114,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
"Managed task number must equal 1-1 source task number");
}
oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
- oneToOneLocationHints = new Container[oneToOneSrcTaskCount];
+ oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
}
for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
@@ -149,7 +148,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
oneToOneSrcTasksDoneCount[taskId.intValue()]++;
// keep the latest container that completed as the location hint
// After there is standard data size info available then use it
- oneToOneLocationHints[taskId.intValue()] = getContext().getTaskContainer(vertex, taskId);
+ oneToOneLocationHints[taskId.intValue()] = TaskLocationHint.createTaskLocationHint(vertex, taskId);
}
}
@@ -192,11 +191,11 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
taskIsStarted[i] = true;
TaskLocationHint locationHint = null;
if (oneToOneLocationHints[i] != null) {
- locationHint = TaskLocationHint.createTaskLocationHint(oneToOneLocationHints[i].getId());
+ locationHint = oneToOneLocationHints[i];
}
LOG.info("Starting task " + i + " for vertex: "
+ getContext().getVertexName() + " with location: "
- + ((locationHint != null) ? locationHint.getAffinitizedContainer() : "null"));
+ + ((locationHint != null) ? locationHint.getAffinitizedTask() : "null"));
tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index f57bdb0..c6981ed 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -101,21 +101,12 @@ public class TestInputReadyVertexManager {
InputDescriptor.create("in"));
String mockManagedVertexId = "Vertex";
- Container mockContainer1 = mock(Container.class);
- ContainerId mockCId1 = mock(ContainerId.class);
- when(mockContainer1.getId()).thenReturn(mockCId1);
- Container mockContainer2 = mock(Container.class);
- ContainerId mockCId2 = mock(ContainerId.class);
- when(mockContainer2.getId()).thenReturn(mockCId2);
VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
- when(mockContext.getTaskContainer(mockSrcVertexId1, 0)).thenReturn(mockContainer1);
- when(mockContext.getTaskContainer(mockSrcVertexId1, 1)).thenReturn(mockContainer2);
- when(mockContext.getTaskContainer(mockSrcVertexId1, 2)).thenReturn(mockContainer1);
mockInputVertices.put(mockSrcVertexId1, eProp1);
Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
@@ -127,20 +118,26 @@ public class TestInputReadyVertexManager {
verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
- Assert.assertEquals(mockCId1, requestCaptor.getValue().get(0)
- .getTaskLocationHint().getAffinitizedContainer());
+ Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(0, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
- Assert.assertEquals(mockCId2, requestCaptor.getValue().get(0)
- .getTaskLocationHint().getAffinitizedContainer());
+ Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
- Assert.assertEquals(mockCId1, requestCaptor.getValue().get(0)
- .getTaskLocationHint().getAffinitizedContainer());
+ Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
}
@Test (timeout=5000)
@@ -234,15 +231,19 @@ public class TestInputReadyVertexManager {
verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
- Assert.assertEquals(mockCId3, requestCaptor.getValue().get(0)
- .getTaskLocationHint().getAffinitizedContainer()); // affinity to last completion
+ Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(0, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
// 1-1 completion triggers since other 1-1 is done
manager.onSourceTaskCompleted(mockSrcVertexId3, 1);
verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
- Assert.assertEquals(mockCId3, requestCaptor.getValue().get(0)
- .getTaskLocationHint().getAffinitizedContainer()); // affinity to last completion
+ Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
// 1-1 completion does not trigger since other 1-1 is not done
manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
verify(mockContext, times(2)).scheduleVertexTasks(anyList());
@@ -251,8 +252,10 @@ public class TestInputReadyVertexManager {
verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
- Assert.assertEquals(mockCId2, requestCaptor.getValue().get(0)
- .getTaskLocationHint().getAffinitizedContainer()); // affinity to last completion
+ Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
// no more starts
manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
http://git-wip-us.apache.org/repos/asf/tez/blob/74c7f874/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 42b31f1..69711d0 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -50,7 +50,6 @@ import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
import org.apache.tez.runtime.library.processor.SimpleProcessor;