You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/02/19 23:05:46 UTC
tez git commit: TEZ-3123. Containers can get re-used even with
conflicting local resources. (hitesh)
Repository: tez
Updated Branches:
refs/heads/master de3a0748f -> 941d1990f
TEZ-3123. Containers can get re-used even with conflicting local resources. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/941d1990
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/941d1990
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/941d1990
Branch: refs/heads/master
Commit: 941d1990f68e8a98b4ab2cd25688a117e29dd697
Parents: de3a074
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Feb 19 14:05:33 2016 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Feb 19 14:05:33 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../dag/app/rm/YarnTaskSchedulerService.java | 7 ++-
.../tez/dag/app/rm/TestContainerReuse.java | 64 ++++++++++++++++++--
3 files changed, 66 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d10b47a..48eca4d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-3123. Containers can get re-used even with conflicting local resources.
TEZ-3117. Deadlock in Edge and Vertex code
TEZ-3103. Shuffle can hang when memory to memory merging enabled
TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
@@ -335,6 +336,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-3123. Containers can get re-used even with conflicting local resources.
TEZ-3117. Deadlock in Edge and Vertex code
TEZ-3103. Shuffle can hang when memory to memory merging enabled
TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index c1c363b..bd4ac2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -2337,11 +2337,12 @@ public class YarnTaskSchedulerService extends TaskScheduler
// Merge the container signatures to account for any changes to the container
// footprint. For example, re-localization of additional resources will
// cause the held container's signature to change.
- lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature();
- if (lastTaskInfo != null && lastTaskInfo.getCookie().getContainerSignature() != null) {
+ if (lastAssignedContainerSignature != null) {
lastAssignedContainerSignature = signatureMatcher.union(
- lastTaskInfo.getCookie().getContainerSignature(),
+ lastAssignedContainerSignature,
taskInfo.getCookie().getContainerSignature());
+ } else {
+ lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature();
}
lastTaskInfo = taskInfo;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 78dc8fd..99c85ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1173,7 +1173,8 @@ public class TestContainerReuse {
TaskAttempt ta111 = mock(TaskAttempt.class);
doReturn(taID111).when(ta111).getID();
doReturn("Mock for TA " + taID111.toString()).when(ta111).toString();
- AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID111, ta111, resource1, host1, racks, priority1, v11LR);
+ AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
+ taID111, ta111, resource1, host1, racks, priority1, v11LR);
Map<String, LocalResource> v12LR = Maps.newHashMap();
v12LR.put(rsrc1, lr1);
@@ -1184,7 +1185,24 @@ public class TestContainerReuse {
TaskAttempt ta112 = mock(TaskAttempt.class);
doReturn(taID112).when(ta112).getID();
doReturn("Mock for TA " + taID112.toString()).when(ta112).toString();
- AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR);
+ AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(
+ taID112, ta112, resource1, host1, racks, priority1, v12LR);
+
+ //Vertex 1, Task 3, Attempt 1, host1
+ TezTaskAttemptID taID113 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 3), 1);
+ TaskAttempt ta113 = mock(TaskAttempt.class);
+ doReturn(taID113).when(ta113).getID();
+ doReturn("Mock for TA " + taID113.toString()).when(ta113).toString();
+ AMSchedulerEventTALaunchRequest lrEvent13 = createLaunchRequestEvent(
+ taID113, ta113, resource1, host1, racks, priority1, new HashMap<String, LocalResource>());
+ //Vertex 1, Task 4, Attempt 1, host1
+ TezTaskAttemptID taID114 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 4), 1);
+ TaskAttempt ta114 = mock(TaskAttempt.class);
+ doReturn(taID114).when(ta114).getID();
+ doReturn("Mock for TA " + taID114.toString()).when(ta114).toString();
+ AMSchedulerEventTALaunchRequest lrEvent14 = createLaunchRequestEvent(
+ taID114, ta114, resource1, host1, racks, priority1, new HashMap<String, LocalResource>());
+
drainNotifier.set(false);
taskSchedulerManager.handleEvent(lrEvent11);
@@ -1220,14 +1238,52 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
- // Setup DAG2 with additional resources. Make sure the container, even without all resources, is reused.
+ // Task 3
+ drainNotifier.set(false);
+ taskSchedulerManager.handleEvent(lrEvent13);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1));
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta113, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta113, true, null, null);
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ // Task 4
+ drainNotifier.set(false);
+ taskSchedulerManager.handleEvent(lrEvent14);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1));
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta114, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta114, true, null, null);
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+
+ // Setup DAG2 with different resources.
TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0);
dagIDAnswer.setDAGID(dagID2);