You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/21 00:24:13 UTC
git commit: TEZ-1039. Add Container locality to TaskScheduler (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 2e664e76c -> aa3b77209
TEZ-1039. Add Container locality to TaskScheduler (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/aa3b7720
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/aa3b7720
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/aa3b7720
Branch: refs/heads/master
Commit: aa3b77209dce6d84fd34e2c7434fd92b4853da05
Parents: 2e664e7
Author: Bikas Saha <bi...@apache.org>
Authored: Tue May 20 15:24:00 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue May 20 15:24:00 2014 -0700
----------------------------------------------------------------------
.../tez/dag/app/rm/LocalTaskScheduler.java | 9 +-
.../apache/tez/dag/app/rm/TaskScheduler.java | 154 +++++++++++++++++--
.../dag/app/rm/TaskSchedulerEventHandler.java | 87 +----------
.../tez/dag/app/rm/TaskSchedulerInterface.java | 9 +-
.../tez/dag/app/rm/TestContainerReuse.java | 12 +-
.../tez/dag/app/rm/TestTaskScheduler.java | 92 ++++++++++-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 4 +-
.../examples/BroadcastAndOneToOneExample.java | 103 ++++++++++---
.../tez/mapreduce/examples/ExampleDriver.java | 2 +
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 2 +-
10 files changed, 345 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
index 7b81407..87bfccd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
@@ -128,11 +128,18 @@ public class LocalTaskScheduler extends AbstractService implements TaskScheduler
public void allocateTask(Object task, Resource capability, String[] hosts,
String[] racks, Priority priority, Object containerSignature,
Object clientCookie) {
-
taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
}
@Override
+ public synchronized void allocateTask(Object task, Resource capability,
+ ContainerId containerId, Priority priority, Object containerSignature,
+ Object clientCookie) {
+ // in local mode every task is already container level local
+ taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
+ }
+
+ @Override
public boolean deallocateTask(Object task, boolean taskSucceeded) {
return taskRequestHandler.addDeallocateTaskRequest(task);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 13a71d8..67369ee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -68,6 +68,7 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/* TODO not yet updating cluster nodes on every allocate response
@@ -140,6 +141,8 @@ public class TaskScheduler extends AbstractService
*/
Map<ContainerId, Object> containerAssignments =
new HashMap<ContainerId, Object>();
+ // Remove inUse depending on resolution of TEZ-1129
+ Set<ContainerId> inUseContainers = Sets.newHashSet();
HashMap<ContainerId, Object> releasedContainers =
new HashMap<ContainerId, Object>();
/**
@@ -148,6 +151,8 @@ public class TaskScheduler extends AbstractService
Map<ContainerId, HeldContainer> heldContainers =
new HashMap<ContainerId, HeldContainer>();
+ Set<Priority> priorityHasAffinity = Sets.newHashSet();
+
Set<NodeId> blacklistedNodes = Collections
.newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
@@ -199,6 +204,7 @@ public class TaskScheduler extends AbstractService
class CookieContainerRequest extends ContainerRequest {
CRCookie cookie;
+ ContainerId affinitizedContainerId;
public CookieContainerRequest(
Resource capability,
@@ -210,9 +216,24 @@ public class TaskScheduler extends AbstractService
this.cookie = cookie;
}
+ public CookieContainerRequest(
+ Resource capability,
+ ContainerId containerId,
+ String[] hosts,
+ String[] racks,
+ Priority priority,
+ CRCookie cookie) {
+ this(capability, hosts, racks, priority, cookie);
+ this.affinitizedContainerId = containerId;
+ }
+
CRCookie getCookie() {
return cookie;
}
+
+ ContainerId getAffinitizedContainer() {
+ return affinitizedContainerId;
+ }
}
public TaskScheduler(TaskSchedulerAppCallback appClient,
@@ -836,6 +857,46 @@ public class TaskScheduler extends AbstractService
CookieContainerRequest request = new CookieContainerRequest(
capability, hosts, racks, priority, cookie);
+ addRequestAndTrigger(task, request, hosts, racks);
+ }
+
+ @Override
+ public synchronized void allocateTask(
+ Object task,
+ Resource capability,
+ ContainerId containerId,
+ Priority priority,
+ Object containerSignature,
+ Object clientCookie) {
+
+ HeldContainer heldContainer = heldContainers.get(containerId);
+ String[] hosts = null;
+ String[] racks = null;
+ if (heldContainer != null) {
+ Container container = heldContainer.getContainer();
+ if (canFit(capability, container.getResource())) {
+ // just specify node and use YARN's soft locality constraint for the rest
+ hosts = new String[1];
+ hosts[0] = container.getNodeId().getHost();
+ priorityHasAffinity.add(priority);
+ } else {
+ LOG.warn("Matching requested to container: " + containerId +
+ " but requested capability: " + capability +
+ " does not fit in container resource: " + container.getResource());
+ }
+ } else {
+ LOG.warn("Matching requested to unknown container: " + containerId);
+ }
+
+ CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
+ CookieContainerRequest request = new CookieContainerRequest(
+ capability, containerId, hosts, racks, priority, cookie);
+
+ addRequestAndTrigger(task, request, hosts, racks);
+ }
+
+ private void addRequestAndTrigger(Object task, CookieContainerRequest request,
+ String[] hosts, String[] racks) {
addTaskRequest(task, request);
// See if any of the delayedContainers can be used for this task.
delayedContainerManager.triggerScheduling(true);
@@ -919,6 +980,18 @@ public class TaskScheduler extends AbstractService
return null;
}
+ boolean canFit(Resource arg0, Resource arg1) {
+ int mem0 = arg0.getMemory();
+ int mem1 = arg1.getMemory();
+ int cpu0 = arg0.getVirtualCores();
+ int cpu1 = arg1.getVirtualCores();
+
+ if(mem0 <= mem1 && cpu0 <= cpu1) {
+ return true;
+ }
+ return false;
+ }
+
void preemptIfNeeded() {
ContainerId preemptedContainer = null;
synchronized (this) {
@@ -1084,21 +1157,66 @@ public class TaskScheduler extends AbstractService
private CookieContainerRequest getMatchingRequestWithoutPriority(
Container container,
- String location) {
+ String location,
+ boolean considerContainerAffinity) {
Resource capability = container.getResource();
List<? extends Collection<CookieContainerRequest>> pRequestsList =
amRmClient.getMatchingRequestsForTopPriority(location, capability);
+ if (considerContainerAffinity &&
+ !priorityHasAffinity.contains(amRmClient.getTopPriority())) {
+ considerContainerAffinity = false;
+ }
if (pRequestsList == null || pRequestsList.isEmpty()) {
return null;
}
+ CookieContainerRequest firstMatch = null;
for (Collection<CookieContainerRequest> requests : pRequestsList) {
for (CookieContainerRequest cookieContainerRequest : requests) {
- if (canAssignTaskToContainer(cookieContainerRequest, container)) {
- return cookieContainerRequest;
+ if (firstMatch == null || // we dont have a match. So look for one
+ // we have a match but are looking for a better container level match.
+ // skip the expensive canAssignTaskToContainer() if the request is
+ // not affinitized to the container
+ container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
+ ) {
+ if (canAssignTaskToContainer(cookieContainerRequest, container)) {
+ // request matched to container
+ if (!considerContainerAffinity) {
+ return cookieContainerRequest;
+ }
+ ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
+ boolean canMatchTaskWithAffinity = true;
+ if (affCId == null ||
+ !heldContainers.containsKey(affCId) ||
+ inUseContainers.contains(affCId)) {
+ // affinity not specified
+ // affinitized container is no longer held
+ // affinitized container is in use
+ canMatchTaskWithAffinity = false;
+ }
+ if (canMatchTaskWithAffinity) {
+ if (container.getId().equals(
+ cookieContainerRequest.getAffinitizedContainer())) {
+ // container level match
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Matching with affinity for request: "
+ + cookieContainerRequest + " container: " + affCId);
+ }
+ return cookieContainerRequest;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping request for container " + container.getId()
+ + " due to affinity. Request: " + cookieContainerRequest
+ + " affContainer: " + affCId);
+ }
+ } else {
+ firstMatch = cookieContainerRequest;
+ }
+ }
}
}
}
- return null;
+
+ return firstMatch;
}
private boolean canAssignTaskToContainer(
@@ -1162,6 +1280,7 @@ public class TaskScheduler extends AbstractService
Container result = taskAllocations.put(task, container);
assert result == null;
+ inUseContainers.add(container.getId());
containerAssignments.put(container.getId(), task);
HeldContainer heldContainer = heldContainers.get(container.getId());
if (!shouldReuseContainers && heldContainer == null) {
@@ -1232,6 +1351,7 @@ public class TaskScheduler extends AbstractService
if (container == null) {
return null;
}
+ inUseContainers.remove(container.getId());
return container;
}
@@ -1245,6 +1365,7 @@ public class TaskScheduler extends AbstractService
}
Container container = taskAllocations.remove(task);
assert container != null;
+ inUseContainers.remove(containerId);
if(releaseIfFound) {
releaseContainer(containerId);
}
@@ -1438,7 +1559,7 @@ public class TaskScheduler extends AbstractService
boolean honorLocality) {
String location = container.getNodeId().getHost();
CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
- container, location);
+ container, location, true);
doBookKeepingForAssignedContainer(assigned, container, location, true);
return assigned;
@@ -1470,7 +1591,7 @@ public class TaskScheduler extends AbstractService
String location = RackResolver.resolve(container.getNodeId().getHost())
.getNetworkLocation();
CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
- container, location);
+ container, location, false);
doBookKeepingForAssignedContainer(assigned, container, location,
honorLocality);
return assigned;
@@ -1500,7 +1621,7 @@ public class TaskScheduler extends AbstractService
if (!honorLocality) {
String location = ResourceRequest.ANY;
CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
- container, location);
+ container, location, false);
doBookKeepingForAssignedContainer(assigned, container, location,
honorLocality);
return assigned;
@@ -1533,7 +1654,7 @@ public class TaskScheduler extends AbstractService
// used for testing only
@VisibleForTesting
- AtomicBoolean drainedDelayedContainers = null;
+ volatile AtomicBoolean drainedDelayedContainersForTest = null;
DelayedContainerManager() {
super.setName("DelayedContainerManager");
@@ -1553,10 +1674,11 @@ public class TaskScheduler extends AbstractService
// locality at this point.
if (delayedContainers.peek() == null) {
try {
- if (drainedDelayedContainers != null) {
- drainedDelayedContainers.set(true);
- synchronized (drainedDelayedContainers) {
- drainedDelayedContainers.notifyAll();
+ // test only signaling to make TestTaskScheduler work
+ if (drainedDelayedContainersForTest != null) {
+ drainedDelayedContainersForTest.set(true);
+ synchronized (drainedDelayedContainersForTest) {
+ drainedDelayedContainersForTest.notifyAll();
}
}
synchronized(this) {
@@ -1568,6 +1690,14 @@ public class TaskScheduler extends AbstractService
LOG.info("AllocatedContainerManager Thread interrupted");
}
} else {
+ // test only sleep to prevent tight loop cycling that makes tests stall
+ if (drainedDelayedContainersForTest != null) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
HeldContainer delayedContainer = delayedContainers.peek();
if (delayedContainer == null) {
continue;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index bc1d1b7..865b192 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -205,35 +205,6 @@ public class TaskSchedulerEventHandler extends AbstractService
}
private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
- /*MRxTaskAttemptID aId = event.getAttemptID();
- attemptToLaunchRequestMap.remove(aId);
- // TODO XXX: This remove may need to be deferred. Possible for a SUCCESSFUL taskAttempt to fail,
- // which means the scheduler needs to remember taskAttempt to container assignments for a longer time.
- boolean removed = pendingReduces.remove(aId);
- if (!removed) {
- removed = scheduledRequests.remove(aId);
- if (!removed) {
- // Maybe assigned.
- ContainerId containerId = assignedRequests.remove(aId);
- if (containerId != null) {
- // Ask the container to stop.
- sendEvent(new AMContainerEvent(containerId,
- AMContainerEventType.C_STOP_REQUEST));
- // Inform the Node - the task has asked to be STOPPED / has already
- // stopped.
- sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
- .get(containerId).getContainer().getNodeId(), containerId,
- event.getAttemptID(), event.getState() == TaskAttemptState.FAILED));
- } else {
- LOG.warn("Received a STOP request for absent taskAttempt: "
- + event.getAttemptID());
- // This could be generated in case of recovery, with unhealthy nodes/
- // fetch failures. Can be ignored, since Recovered containers don't
- // need to be stopped.
- }
- }
- }*/
-
TaskAttempt attempt = event.getAttempt();
boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, false);
// use stored value of container id in case the scheduler has removed this
@@ -264,23 +235,6 @@ public class TaskSchedulerEventHandler extends AbstractService
}
private void handleTASucceeded(AMSchedulerEventTAEnded event) {
- /*
- // TODO XXX Remember the assigned containerId even after task success.
- // Required for TOO_MANY_FETCH_FAILURES
- attemptToLaunchRequestMap.remove(event.getAttemptID());
- ContainerId containerId = assignedRequests.remove(event.getAttemptID());
- if (containerId != null) { // TODO Should not be null. Confirm.
- sendEvent(new AMContainerTASucceededEvent(containerId,
- event.getAttemptID()));
- sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap
- .get(containerId).getContainer().getNodeId(), containerId,
- event.getAttemptID()));
- containerAvailable(containerId);
- } else {
- LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
- + event.getAttemptID() + ". Full event: " + event);
- }*/
-
TaskAttempt attempt = event.getAttempt();
ContainerId usedContainerId = event.getUsedContainerId();
@@ -302,46 +256,19 @@ public class TaskSchedulerEventHandler extends AbstractService
}
private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
- /**
- // Add to queue of pending tasks.
- recalculateReduceSchedule = true;
- attemptToLaunchRequestMap.put(event.getAttemptID(), event);
- if (event.getAttemptID().getTaskID().getTaskType() == TaskType.MAP) {
- mapResourceReqt = maybeComputeNormalizedRequestForType(event,
- TaskType.MAP, mapResourceReqt);
- event.getCapability().setMemory(mapResourceReqt);
- scheduledRequests.addMap(event);
- } else { // Reduce
- reduceResourceReqt = maybeComputeNormalizedRequestForType(event,
- TaskType.REDUCE, reduceResourceReqt);
- event.getCapability().setMemory(reduceResourceReqt);
- if (event.isRescheduled()) {
- pendingReduces.addFirst(new ContainerRequestInfo(new ContainerRequest(
- event.getCapability(), event.getHosts(), event.getRacks(),
- PRIORITY_REDUCE), event));
- } else {
- pendingReduces.addLast(new ContainerRequestInfo(new ContainerRequest(
- event.getCapability(), event.getHosts(), event.getRacks(),
- PRIORITY_REDUCE), event));
- }
- }
- */
- // TODO resource adjustment needs to move into dag
- /*Resource mapResourceReqt = maybeComputeNormalizedRequestForType(event,
- TaskType.MAP, mapResourceReqt);
- event.getCapability().setMemory(mapResourceReqt);*/
TaskAttempt taskAttempt = event.getTaskAttempt();
TaskLocationHint locationHint = event.getLocationHint();
String hosts[] = null;
String racks[] = null;
- // Until TEZ-1039 is done we need to translate container affinity to the node
- // for that container
if (locationHint != null) {
if (locationHint.getAffinitizedContainer() != null) {
- ContainerId containerId = locationHint.getAffinitizedContainer();
- AMContainer container = appContext.getAllContainers().get(containerId);
- hosts = new String[1];
- hosts[0] = container.getContainer().getNodeId().getHost();
+ taskScheduler.allocateTask(taskAttempt,
+ event.getCapability(),
+ locationHint.getAffinitizedContainer(),
+ event.getPriority(),
+ event.getContainerContext(),
+ event);
+ return;
} else {
hosts = (locationHint.getDataLocalHosts() != null) ? locationHint
.getDataLocalHosts().toArray(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
index 5b6f179..1c35492 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
@@ -40,7 +40,14 @@ public interface TaskSchedulerInterface {
public abstract void allocateTask(Object task, Resource capability,
String[] hosts, String[] racks, Priority priority,
Object containerSignature, Object clientCookie);
-
+
+ /**
+ * Allocate affinitized to a specific container
+ */
+ public abstract void allocateTask(Object task, Resource capability,
+ ContainerId containerId, Priority priority, Object containerSignature,
+ Object clientCookie);
+
public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
public abstract Object deallocateContainer(ContainerId containerId);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 580062d..2e89b66 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -157,7 +157,7 @@ public class TestContainerReuse {
taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
- taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
Resource resource = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(5);
@@ -293,7 +293,7 @@ public class TestContainerReuse {
taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
- taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
Resource resource = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(5);
@@ -389,7 +389,7 @@ public class TestContainerReuse {
.getSpyTaskScheduler();
TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
- taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
Resource resource1 = Resource.newInstance(1024, 1);
String[] host1 = {"host1"};
@@ -537,7 +537,7 @@ public class TestContainerReuse {
TaskSchedulerAppCallbackDrainable drainableAppCallback =
taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
- taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
Resource resource1 = Resource.newInstance(1024, 1);
String [] emptyHosts = new String[0];
@@ -661,7 +661,7 @@ public class TestContainerReuse {
TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
- taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
Resource resource1 = Resource.newInstance(1024, 1);
String[] host1 = {"host1"};
@@ -775,7 +775,7 @@ public class TestContainerReuse {
.getSpyTaskScheduler();
TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
- taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
Resource resource1 = Resource.newInstance(1024, 1);
String[] host1 = {"host1"};
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index bf428ff..9652da4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -547,6 +547,7 @@ public class TestTaskScheduler {
final Priority mockPriority2 = Priority.newInstance(2);
final Priority mockPriority3 = Priority.newInstance(3);
final Priority mockPriority4 = Priority.newInstance(4);
+ final Priority mockPriority5 = Priority.newInstance(5);
Object mockTask2 = mock(Object.class);
when(mockTask2.toString()).thenReturn("task2");
Object mockCookie2 = mock(Object.class);
@@ -708,7 +709,7 @@ public class TestTaskScheduler {
});
AtomicBoolean drainNotifier = new AtomicBoolean(false);
- scheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier;
+ scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
scheduler.onContainersAllocated(containers);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -865,6 +866,95 @@ public class TestTaskScheduler {
scheduler.unblacklistNode(badNodeId);
verify(mockRMClient, times(1)).removeNodeFromBlacklist(badNodeId);
assertEquals(0, scheduler.blacklistedNodes.size());
+
+ // verify container level matching
+ // fudge the top level priority to prevent containers from being released
+ // if top level priority is higher than newly allocated containers then
+ // they will not be released
+ final AtomicBoolean fudgePriority = new AtomicBoolean(true);
+ when(mockRMClient.getTopPriority()).then(
+ new Answer<Priority>() {
+ @Override
+ public Priority answer(
+ InvocationOnMock invocation) throws Throwable {
+ if (fudgePriority.get()) {
+ return mockPriority4;
+ }
+ return mockPriority5;
+ }
+ });
+ // add a dummy task to prevent release of allocated containers
+ Object mockTask5 = mock(Object.class);
+ when(mockTask5.toString()).thenReturn("task5");
+ Object mockCookie5 = mock(Object.class);
+ scheduler.allocateTask(mockTask5, mockCapability, hosts,
+ racks, mockPriority5, null, mockCookie5);
+ verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request6 = requestCaptor.getValue();
+ drainableAppCallback.drain();
+ // add containers so that we can reference one of them for container specific
+ // allocation
+ containers.clear();
+ Container mockContainer7 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer7.getNodeId().getHost()).thenReturn("host5");
+ ContainerId mockCId7 = mock(ContainerId.class);
+ when(mockContainer7.toString()).thenReturn("container7");
+ when(mockCId7.toString()).thenReturn("container7");
+ when(mockContainer7.getId()).thenReturn(mockCId7);
+ when(mockContainer7.getPriority()).thenReturn(mockPriority5);
+ containers.add(mockContainer7);
+ Container mockContainer8 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer8.getNodeId().getHost()).thenReturn("host5");
+ ContainerId mockCId8 = mock(ContainerId.class);
+ when(mockContainer8.toString()).thenReturn("container8");
+ when(mockCId8.toString()).thenReturn("container8");
+ when(mockContainer8.getId()).thenReturn(mockCId8);
+ when(mockContainer8.getPriority()).thenReturn(mockPriority5);
+ containers.add(mockContainer8);
+ drainNotifier.set(false);
+ scheduler.onContainersAllocated(containers);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+ Object mockTask6 = mock(Object.class);
+ when(mockTask6.toString()).thenReturn("task6");
+ Object mockCookie6 = mock(Object.class);
+ // allocate request with container affinity
+ scheduler.allocateTask(mockTask6, mockCapability, mockCId7, mockPriority5, null, mockCookie6);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request7 = requestCaptor.getValue();
+ hostContainers.clear();
+ hostContainers.add(request6);
+ hostContainers.add(request7);
+
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(eq("host5"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return hostList;
+ }
+
+ });
+ // stop fudging top priority
+ fudgePriority.set(false);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ verify(mockApp, times(6)).taskAllocated(any(), any(), (Container) any());
+ // container7 allocated to the task with affinity for it
+ verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7);
+ // deallocate allocated task
+ assertTrue(scheduler.deallocateTask(mockTask5, true));
+ assertTrue(scheduler.deallocateTask(mockTask6, true));
+ drainableAppCallback.drain();
+ verify(mockApp).containerBeingReleased(mockCId7);
+ verify(mockApp).containerBeingReleased(mockCId8);
+ verify(mockRMClient).releaseAssignedContainer(mockCId7);
+ verify(mockRMClient).releaseAssignedContainer(mockCId8);
+ verify(mockRMClient, times(7)).releaseAssignedContainer((ContainerId) any());
+
float progress = 0.5f;
when(mockApp.getProgress()).thenReturn(progress);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index a9fdaab..5a0856b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -371,8 +371,8 @@ class TestTaskSchedulerHelpers {
static void waitForDelayedDrainNotify(AtomicBoolean drainNotifier)
throws InterruptedException {
- while (!drainNotifier.get()) {
- synchronized (drainNotifier) {
+ synchronized (drainNotifier) {
+ while (!drainNotifier.get()) {
drainNotifier.wait();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 5a972e8..0cd6aaf 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -19,8 +19,6 @@
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -35,6 +33,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
@@ -47,6 +48,7 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -59,6 +61,9 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
@@ -78,6 +83,15 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
.next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
+ byte[] userPayload = getContext().getUserPayload();
+ if (userPayload != null) {
+ boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
+ if (doLocalityCheck) {
+ ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry();
+ String entry = String.valueOf(getContext().getTaskIndex());
+ objectRegistry.add(ObjectLifeCycle.DAG, entry, entry);
+ }
+ }
}
}
@@ -97,27 +111,29 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
while (inputKvReader.next()) {
sum += ((IntWritable) inputKvReader.getCurrentValue()).get();
}
- System.out.println("Index: " + getContext().getTaskIndex() + " sum: " + sum);
- int taskIndex = getContext().getTaskIndex();
- switch (taskIndex) {
- case 0:
- Preconditions.checkState((sum == 1), "Sum = " + sum);
- break;
- case 1:
- Preconditions.checkState((sum == 2), "Sum = " + sum);
- break;
- case 2:
- Preconditions.checkState((sum == 3), "Sum = " + sum);
- break;
- default:
- throw new TezUncheckedException("Unexpected taskIndex: " + taskIndex);
+ boolean doLocalityCheck = getContext().getUserPayload()[0] > 0 ? true : false;
+ int broadcastSum = getContext().getUserPayload()[1];
+ int expectedSum = broadcastSum + getContext().getTaskIndex();
+ System.out.println("Index: " + getContext().getTaskIndex() +
+ " sum: " + sum + " expectedSum: " + expectedSum + " broadcastSum: " + broadcastSum);
+ Preconditions.checkState((sum == expectedSum), "Sum = " + sum);
+
+ if (doLocalityCheck) {
+ ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry();
+ String index = (String) objectRegistry.get(String.valueOf(getContext().getTaskIndex()));
+ if (index == null || Integer.valueOf(index).intValue() != getContext().getTaskIndex()) {
+ String msg = "Did not find expected local producer "
+ + getContext().getTaskIndex() + " in the same JVM";
+ System.out.println(msg);
+ throw new TezUncheckedException(msg);
+ }
}
}
}
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
- Path stagingDir) throws IOException {
+ Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
Configuration kvInputConf = new JobConf((Configuration)tezConf);
kvInputConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
Text.class.getName());
@@ -134,22 +150,39 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
MRHelpers.doJobClientMagic(kvInputConf);
MRHelpers.doJobClientMagic(kvOneToOneConf);
+ int numBroadcastTasks = 2;
+ int numOneToOneTasks = 3;
+ if (doLocalityCheck) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(tezConf);
+ yarnClient.start();
+ int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
+ yarnClient.stop();
+ // create enough 1-1 tasks to run in parallel
+ numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
+ if (numOneToOneTasks < 1) {
+ numOneToOneTasks = 1;
+ }
+ }
+ byte[] procPayload = {(byte) (doLocalityCheck ? 1 : 0), 1};
+
+ System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
+
byte[] kvInputPayload = MRHelpers.createUserPayloadFromConf(kvInputConf);
Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
InputProcessor.class.getName()),
- 2, MRHelpers.getMapResource(kvInputConf));
+ numBroadcastTasks, MRHelpers.getMapResource(kvInputConf));
broadcastVertex.setJavaOpts(MRHelpers.getMapJavaOpts(kvInputConf));
- int numOneToOneTasks = 3;
Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
- InputProcessor.class.getName()),
+ InputProcessor.class.getName()).setUserPayload(procPayload),
numOneToOneTasks, MRHelpers.getMapResource(kvInputConf));
inputVertex.setJavaOpts(MRHelpers.getMapJavaOpts(kvInputConf));
-
+
byte[] kvOneToOnePayload = MRHelpers.createUserPayloadFromConf(kvOneToOneConf);
Vertex oneToOneVertex = new Vertex("OneToOne",
new ProcessorDescriptor(
- OneToOneProcessor.class.getName()),
+ OneToOneProcessor.class.getName()).setUserPayload(procPayload),
numOneToOneTasks, MRHelpers.getReduceResource(kvOneToOneConf));
oneToOneVertex.setJavaOpts(
MRHelpers.getReduceJavaOpts(kvOneToOneConf)).setVertexManagerPlugin(
@@ -180,7 +213,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
private Credentials credentials = new Credentials();
- public boolean run(Configuration conf) throws Exception {
+ public boolean run(Configuration conf, boolean doLocalityCheck) throws Exception {
System.out.println("Running BroadcastAndOneToOneExample");
// conf and UGI
TezConfiguration tezConf;
@@ -189,6 +222,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
} else {
tezConf = new TezConfiguration();
}
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
UserGroupInformation.setConfiguration(tezConf);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
@@ -229,7 +263,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
DAGClient dagClient = null;
try {
- DAG dag = createDAG(fs, tezConf, stagingDir);
+ DAG dag = createDAG(fs, tezConf, stagingDir, doLocalityCheck);
tezSession.waitTillReady();
dagClient = tezSession.submitDAG(dag);
@@ -249,9 +283,28 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- boolean status = run(getConf());
+ boolean doLocalityCheck = true;
+ if (args.length == 1) {
+ if (args[0].equals(skipLocalityCheck)) {
+ doLocalityCheck = false;
+ } else {
+ printUsage();
+ throw new TezException("Invalid command line");
+ }
+ } else if (args.length > 1) {
+ printUsage();
+ throw new TezException("Invalid command line");
+ }
+ boolean status = run(getConf(), doLocalityCheck);
return status ? 0 : 1;
}
+
+ private static void printUsage() {
+ System.err.println("broadcastAndOneToOneExample " + skipLocalityCheck);
+ ToolRunner.printGenericCommandUsage(System.err);
+ }
+
+ static String skipLocalityCheck = "-skipLocalityCheck";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 2f9cde2..636e83b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -81,6 +81,8 @@ public class ExampleDriver {
"Word Count with words sorted on frequency");
pgd.addClass("unionexample", UnionExample.class,
"Union example");
+ pgd.addClass("broadcastAndOneToOneExample", BroadcastAndOneToOneExample.class,
+ "BroadcastAndOneToOneExample example");
pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
"Filters lines by the specified word using broadcast edge");
pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/aa3b7720/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index b690a11..d5fd04d 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -698,7 +698,7 @@ public class TestMRRJobsDAGApi {
public void testBroadcastAndOneToOne() throws Exception {
LOG.info("Running BroadcastAndOneToOne Test");
BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample();
- if (job.run(mrrTezCluster.getConfig())) {
+ if (job.run(mrrTezCluster.getConfig(), true)) {
LOG.info("Success BroadcastAndOneToOne Test");
} else {
throw new TezUncheckedException("BroadcastAndOneToOne Test Failed");