You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:43 UTC
[34/50] [abbrv] tez git commit: TEZ-1742 addendum patch for follow up
comments
TEZ-1742 addendum patch for follow up comments
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6c08c18
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6c08c18
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6c08c18
Branch: refs/heads/TEZ-8
Commit: c6c08c18b77870e71298adb2dc51908473434768
Parents: 0127a90
Author: Bikas Saha <bi...@apache.org>
Authored: Sat Nov 8 11:55:05 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat Nov 8 11:55:22 2014 -0800
----------------------------------------------------------------------
.../dag/app/rm/YarnTaskSchedulerService.java | 16 +++++++++------
.../tez/dag/app/rm/TestTaskScheduler.java | 21 +++++++++++++++++++-
2 files changed, 30 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c6c08c18/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 5941a45..f8fbd53 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -1051,8 +1051,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
return false;
}
- private int scaleDownByPreemptionPercentage(int original) {
- return (original + (preemptionPercentage - 1)) / preemptionPercentage;
+ static int scaleDownByPreemptionPercentage(int original, int percent) {
+ return (int) Math.ceil((original * percent)/100.f);
}
void preemptIfNeeded() {
@@ -1103,8 +1103,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// free up some more
// TODO this is subject to error wrt RM resource normalization
- numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests);
-
+ numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests,
+ preemptionPercentage);
+
if (numPendingRequestsToService < 1) {
return;
}
@@ -1223,7 +1224,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
if(preemptedTaskPriority != null) {
int newNumPendingRequestsToService = scaleDownByPreemptionPercentage(Math.min(
- numEntriesAtPreemptedPriority, numHighestPriRequests));
+ numEntriesAtPreemptedPriority, numHighestPriRequests), preemptionPercentage);
numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
numPendingRequestsToService);
if (numPendingRequestsToService < 1) {
@@ -1237,8 +1238,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
preemptedContainers = new ContainerId[numPendingRequestsToService];
int currIndex = 0;
for (Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+ HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
+ CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
+ Priority taskPriority = lastTaskInfo.getPriority();
Container container = entry.getValue();
- if (preemptedTaskPriority.equals(container.getPriority())) {
+ if (preemptedTaskPriority.equals(taskPriority)) {
// taskAllocations map will iterate from oldest to newest assigned containers
// keep the N newest containersIds with the matching priority
preemptedContainers[currIndex++ % numPendingRequestsToService] = container.getId();
http://git-wip-us.apache.org/repos/asf/tez/blob/c6c08c18/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 3afff7c..5782d01 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1328,7 +1328,7 @@ public class TestTaskScheduler {
Container mockContainer3B = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer3B.getNodeId().getHost()).thenReturn("host1");
when(mockContainer3B.getResource()).thenReturn(taskAsk);
- when(mockContainer3B.getPriority()).thenReturn(pri6);
+ when(mockContainer3B.getPriority()).thenReturn(pri2); // high priority container
ContainerId mockCId3B = mock(ContainerId.class);
when(mockContainer3B.getId()).thenReturn(mockCId3B);
containers.add(mockContainer3B);
@@ -1391,6 +1391,8 @@ public class TestTaskScheduler {
scheduler.taskAllocations.get(mockTask3).getId());
Assert.assertEquals(mockCId3A,
scheduler.taskAllocations.get(mockTask3KillA).getId());
+ // high priority container assigned to lower pri task. This task should still be preempted
+ // because the task priority is relevant for preemption and not the container priority
Assert.assertEquals(mockCId3B,
scheduler.taskAllocations.get(mockTask3KillB).getId());
@@ -1460,6 +1462,9 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
// mockTaskPri3KillB gets preempted to clear 10% of outstanding running preemptable tasks
+ // this is also a higher priority container than the pending task priority but was running a
+ // lower priority task. Task priority is relevant for preemption and not container priority as
+ // containers can run tasks of different priorities
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
@@ -1633,6 +1638,20 @@ public class TestTaskScheduler {
when(appClient.getFinalAppStatus()).thenReturn(finalStatus);
taskScheduler.close();
}
+
+ @Test (timeout=5000)
+ public void testScaleDownPercentage() {
+ Assert.assertEquals(100, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 100));
+ Assert.assertEquals(70, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 70));
+ Assert.assertEquals(50, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 50));
+ Assert.assertEquals(10, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 10));
+ Assert.assertEquals(5, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 5));
+ Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 1));
+ Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(5, 5));
+ Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 10));
+ Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 70));
+ Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1));
+ }
private Container createContainer(int id, String host, Resource resource,
Priority priority) {