You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2017/08/04 20:23:10 UTC
tez git commit: TEZ-3803. Tasks can get killed due to insufficient
progress while waiting for shuffle inputs to complete. Contributed by Kuhu
Shukla
Repository: tez
Updated Branches:
refs/heads/master 614937c5d -> 8dcf8a121
TEZ-3803. Tasks can get killed due to insufficient progress while waiting for shuffle inputs to complete. Contributed by Kuhu Shukla
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8dcf8a12
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8dcf8a12
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8dcf8a12
Branch: refs/heads/master
Commit: 8dcf8a121f5961e2974ef1121ec9d0200cbdc0ae
Parents: 614937c
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Fri Aug 4 15:21:54 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Fri Aug 4 15:21:54 2017 -0500
----------------------------------------------------------------------
.../common/shuffle/impl/ShuffleManager.java | 9 +++---
.../orderedgrouped/ShuffleScheduler.java | 33 +++++++++++---------
.../common/shuffle/impl/TestShuffleManager.java | 21 +++++++++++++
.../orderedgrouped/TestShuffleScheduler.java | 24 ++++++++++++++
4 files changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index e1b7f99..24fb12b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
@@ -320,10 +321,10 @@ public class ShuffleManager implements FetcherCallback {
while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
lock.lock();
try {
- if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
- if (numCompletedInputs.get() < numInputs) {
- wakeLoop.await();
- }
+ while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
+ && numCompletedInputs.get() < numInputs) {
+ inputContext.notifyProgress();
+ boolean ret = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
}
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index b223c1a..981e224 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -1120,7 +1120,7 @@ class ShuffleScheduler {
if (LOG.isDebugEnabled()) {
LOG.debug("PendingHosts=" + pendingHosts);
}
- wait();
+ waitAndNotifyProgress();
}
if (!pendingHosts.isEmpty()) {
@@ -1360,19 +1360,19 @@ class ShuffleScheduler {
protected Void callInternal() throws InterruptedException {
while (!isShutdown.get() && remainingMaps.get() > 0) {
synchronized (ShuffleScheduler.this) {
- if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
- if (remainingMaps.get() > 0) {
- try {
- ShuffleScheduler.this.wait();
- } catch (InterruptedException e) {
- if (isShutdown.get()) {
- LOG.info(srcNameTrimmed + ": " +
- "Interrupted while waiting for fetchers to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
- Thread.currentThread().interrupt();
- break;
- } else {
- throw e;
- }
+ while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
+ && remainingMaps.get() > 0) {
+ try {
+ waitAndNotifyProgress();
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info(srcNameTrimmed + ": " +
+ "Interrupted while waiting for fetchers to complete" +
+ "and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
+ Thread.currentThread().interrupt();
+ break;
+ } else {
+ throw e;
}
}
}
@@ -1446,6 +1446,11 @@ class ShuffleScheduler {
}
}
+ private synchronized void waitAndNotifyProgress() throws InterruptedException {
+ inputContext.notifyProgress();
+ wait(1000);
+ }
+
@VisibleForTesting
FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator,
http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index f361dc7..23248ed 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -193,6 +194,26 @@ public class TestShuffleManager {
verify(inputContext).createTezFrameworkExecutorService(anyInt(), anyString());
}
+ @Test (timeout = 20000)
+ public void testProgressWithEmptyPendingHosts() throws Exception {
+ InputContext inputContext = createInputContext();
+ final ShuffleManager shuffleManager = spy(createShuffleManager(inputContext, 1));
+ Thread schedulerGetHostThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ shuffleManager.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ schedulerGetHostThread.start();
+ Thread.currentThread().sleep(1000 * 3 + 1000);
+ schedulerGetHostThread.interrupt();
+ verify(inputContext, atLeast(3)).notifyProgress();
+ }
+
private ShuffleManagerForTest createShuffleManager(
InputContext inputContext, int expectedNumOfPhysicalInputs)
throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/8dcf8a12/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index c61391c..381ad85 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -812,6 +812,28 @@ public class TestShuffleScheduler {
assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
}
+ @Test (timeout = 20000)
+ public void testProgressDuringGetHostWait() throws IOException, InterruptedException {
+ long startTime = System.currentTimeMillis();
+ Configuration conf = new TezConfiguration();
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle, conf);
+ Thread schedulerGetHostThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ scheduler.getHost();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ schedulerGetHostThread.start();
+ Thread.currentThread().sleep(1000 * 3 + 1000);
+ schedulerGetHostThread.interrupt();
+ verify(scheduler.inputContext, atLeast(3)).notifyProgress();
+ }
+
@Test(timeout = 5000)
public void testShutdown() throws Exception {
InputContext inputContext = createTezInputContext();
@@ -964,6 +986,7 @@ public class TestShuffleScheduler {
private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
private final boolean fetcherShouldWait;
private final ExceptionReporter reporter;
+ private final InputContext inputContext;
public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf,
int numberOfInputs,
@@ -989,6 +1012,7 @@ public class TestShuffleScheduler {
ifileReadAhead, ifileReadAheadLength, srcNameTrimmed);
this.fetcherShouldWait = fetcherShouldWait;
this.reporter = shuffle;
+ this.inputContext = inputContext;
}
@Override