You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/18 23:55:59 UTC
tez git commit: TEZ-2816. Preemption sometimes does not respect
heartbeats between preemptions (bikas)
Repository: tez
Updated Branches:
refs/heads/master 04571cc53 -> a06cd76d8
TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a06cd76d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a06cd76d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a06cd76d
Branch: refs/heads/master
Commit: a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0
Parents: 04571cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 18 14:55:27 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 18 14:55:27 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 4 +++
.../dag/app/rm/YarnTaskSchedulerService.java | 38 +++++++++++---------
.../tez/dag/app/rm/TestTaskScheduler.java | 34 ++++++++++++++----
.../org/apache/tez/analyzer/TestAnalyzer.java | 4 +--
4 files changed, 54 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c34cca..b4a5db4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2832. Support tests for both SimpleHistory logging and ATS logging
TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
@@ -184,6 +185,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM
@@ -441,6 +443,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7
TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
@@ -662,6 +665,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
TEZ-814. Improve heuristic for determining a task has failed outputs
TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
down an AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 3c1cf3f..5a5464f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -892,7 +892,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
synchronized (this) {
numHeartbeats++;
- preemptIfNeeded();
+ if (preemptIfNeeded()) {
+ heartbeatAtLastPreemption = numHeartbeats;
+ }
}
return getContext().getProgress();
@@ -1127,10 +1129,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
" heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption;
}
- void preemptIfNeeded() {
+ boolean preemptIfNeeded() {
if (preemptionPercentage == 0) {
// turned off
- return;
+ return true;
}
ContainerId[] preemptedContainers = null;
int numPendingRequestsToService = 0;
@@ -1162,7 +1164,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
if (highestPriRequest == null) {
// nothing pending
- return;
+ return true;
}
if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
@@ -1173,7 +1175,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
LOG.info(highestPriRequest + " fits in free resources");
}
}
- return;
+ return true;
}
// highest priority request will not fit in existing free resources
// free up some more
@@ -1183,7 +1185,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
preemptionPercentage);
if (numPendingRequestsToService < 1) {
- return;
+ // nothing to preempt - reset preemption last heartbeat
+ return true;
}
if (LOG.isDebugEnabled()) {
@@ -1191,7 +1194,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority());
}
-
+
for (int i=0; i<numPendingRequestsToService; ++i) {
// This request must have been considered for matching with all existing
// containers when request was made.
@@ -1204,7 +1207,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+ heldContainer.getContainer().getId());
}
- return;
+ return true;
}
if (heldContainer.geNumAssignmentAttempts() < 3) {
// we havent tried to assign this container at node/rack/ANY
@@ -1212,7 +1215,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
LOG.debug("Brand new container. Wait for assignment loop to match it. "
+ heldContainer.getContainer().getId());
}
- return;
+ return true;
}
Container container = heldContainer.getContainer();
if (lowestPriNewContainer == null ||
@@ -1257,18 +1260,14 @@ public class YarnTaskSchedulerService extends TaskScheduler
}
if (numPendingRequestsToService < 1) {
- return;
+ return true;
}
// there are no reused or new containers to release. try to preempt running containers
// this assert will be a no-op in production but can help identify
// invalid assumptions during testing
assert delayedContainerManager.delayedContainers.isEmpty();
-
- if ((numHeartbeats - heartbeatAtLastPreemption) <= numHeartbeatsBetweenPreemptions) {
- return;
- }
-
+
Priority preemptedTaskPriority = null;
int numEntriesAtPreemptedPriority = 0;
for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
@@ -1304,7 +1303,12 @@ public class YarnTaskSchedulerService extends TaskScheduler
numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
numPendingRequestsToService);
if (numPendingRequestsToService < 1) {
- return;
+ return true;
+ }
+ // wait for enough heartbeats since this request became active for preemption
+ if ((numHeartbeats - heartbeatAtLastPreemption) < numHeartbeatsBetweenPreemptions) {
+ // stop incrementing lastpreemption heartbeat count
+ return false;
}
LOG.info("Trying to service " + numPendingRequestsToService + " out of total "
+ numHighestPriRequests + " pending requests at pri: "
@@ -1331,7 +1335,6 @@ public class YarnTaskSchedulerService extends TaskScheduler
// upcall outside locks
if (preemptedContainers != null) {
- heartbeatAtLastPreemption = numHeartbeats;
for(int i=0; i<numPendingRequestsToService; ++i) {
ContainerId cId = preemptedContainers[i];
if (cId != null) {
@@ -1340,6 +1343,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
}
}
}
+ return true;
}
private boolean fitsIn(Resource toFit, Resource resource) {
http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 5efea48..1fc5092 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1508,6 +1508,7 @@ public class TestTaskScheduler {
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+ conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3);
TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
null, null, new PreemptionMatcher(), conf);
@@ -1692,7 +1693,10 @@ public class TestTaskScheduler {
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+ // add a pending request that cannot be allocated until resources free up
Object mockTask3WaitCookie = new Object();
scheduler.allocateTask(mockTask3Wait, taskAsk, null,
null, pri6, obj3, mockTask3WaitCookie);
@@ -1711,6 +1715,7 @@ public class TestTaskScheduler {
containers.clear();
containers.add(mockContainer4);
+ // new lower pri container added that wont be matched and eventually preempted
// Fudge new container being present in delayed allocation list due to race
HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null,
containerSignatureMatcher);
@@ -1719,6 +1724,9 @@ public class TestTaskScheduler {
scheduler.getProgress();
drainableAppCallback.drain();
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+
heldContainer.incrementAssignmentAttempts();
// no preemption - container assignment attempts < 3
scheduler.getProgress();
@@ -1740,12 +1748,18 @@ public class TestTaskScheduler {
// remove fudging.
scheduler.delayedContainerManager.delayedContainers.clear();
-
+
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+
scheduler.allocateTask(mockTask3Retry, taskAsk, null,
null, pri5, obj3, null);
// no preemption - higher pri. exact match
scheduler.getProgress();
+ // no need for task preemption until now - so they should match
drainableAppCallback.drain();
+ // no need for task preemption until now - so they should match
+ Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
for (int i=0; i<11; ++i) {
@@ -1758,16 +1772,22 @@ public class TestTaskScheduler {
// this is also a higher priority container than the pending task priority but was running a
// lower priority task. Task priority is relevant for preemption and not container priority as
// containers can run tasks of different priorities
- scheduler.getProgress();
+ scheduler.getProgress(); // first heartbeat
+ Assert.assertTrue(scheduler.numHeartbeats > scheduler.heartbeatAtLastPreemption);
+ drainableAppCallback.drain();
+ scheduler.getProgress(); // second heartbeat
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+ scheduler.getProgress(); // third heartbeat
drainableAppCallback.drain();
verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B);
- // next 3 heartbeats do nothing, waiting for the RM to act on the last released resources
- scheduler.getProgress();
- scheduler.getProgress();
- scheduler.getProgress();
+ Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+ // there are pending preemptions.
+ scheduler.getProgress(); // first heartbeat
+ scheduler.getProgress(); // second heartbeat
verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) any());
- scheduler.getProgress();
+ scheduler.getProgress(); // third heartbeat
drainableAppCallback.drain();
// Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable tasks
verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId)any());
http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index ca9250d..7ea5a39 100644
--- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -168,7 +168,7 @@ public class TestAnalyzer {
remoteStagingDir.toString());
tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
- tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
+ tezSession = TezClient.create("TestAnalyzer", tezConf, true);
tezSession.start();
}
@@ -821,4 +821,4 @@ public class TestAnalyzer {
return Collections.singletonList(check);
}
-}
\ No newline at end of file
+}