You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/11/11 18:10:20 UTC
tez git commit: TEZ-3534. Differentiate thread names on Fetchers,
minor changes to shuffle shutdown code. (sseth)
Repository: tez
Updated Branches:
refs/heads/master a93dbf0b2 -> 0d5984426
TEZ-3534. Differentiate thread names on Fetchers, minor changes to
shuffle shutdown code. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0d598442
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0d598442
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0d598442
Branch: refs/heads/master
Commit: 0d598442640ff3cb6ca52f2077bfddb62b54e628
Parents: a93dbf0
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Nov 11 10:11:26 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Nov 11 10:11:26 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../common/shuffle/impl/ShuffleManager.java | 10 ++++--
.../common/shuffle/orderedgrouped/Shuffle.java | 8 ++++-
.../orderedgrouped/ShuffleScheduler.java | 33 ++++++++++++++++----
.../orderedgrouped/TestShuffleScheduler.java | 4 ---
5 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af83c73..8128c7b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.
TEZ-3491. Tez job can hang due to container priority inversion.
TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
TEZ-3477. MRInputHelpers generateInputSplitsToMem public API modified
@@ -146,6 +147,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.
TEZ-3491. Tez job can hang due to container priority inversion.
TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
TEZ-3493. DAG submit timeout cannot be set to a month
http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 1ebd3a4..d034b2e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -234,7 +234,7 @@ public class ShuffleManager implements FetcherCallback {
ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
numFetchers,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
+ .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build());
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
@@ -812,7 +812,13 @@ public class ShuffleManager implements FetcherCallback {
try {
wakeLoop.signal(); // signal the fetch-scheduler
for (Fetcher fetcher : runningFetchers) {
- fetcher.shutdown(); // This could be parallelized.
+ try {
+ fetcher.shutdown(); // This could be parallelized.
+ } catch (Exception e) {
+ LOG.warn(
+ "Error while stopping fetcher during shutdown. Ignoring and continuing. Message={}",
+ e.getMessage());
+ }
}
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 5a18959..e5f4e5c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -378,7 +378,13 @@ public class Shuffle implements ExceptionReporter {
if (eventHandler != null) {
eventHandler.logProgress(true);
}
- cleanupShuffleSchedulerIgnoreErrors();
+ try {
+ cleanupShuffleSchedulerIgnoreErrors();
+ } catch (Exception e) {
+ LOG.warn(
+ "Error cleaning up shuffle scheduler. Ignoring and continuing with shutdown. Message={}",
+ e.getMessage());
+ }
cleanupMerger(true);
} catch (Throwable t) {
LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t);
http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 09518e5..3d2c1ad 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -376,7 +376,7 @@ class ShuffleScheduler {
ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
+ .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
@@ -429,10 +429,15 @@ class ShuffleScheduler {
schedulerCallable.call();
}
- public void close() throws InterruptedException {
+ public void close() {
try {
if (!isShutdown.getAndSet(true)) {
- logProgress();
+ try {
+ logProgress();
+ } catch (Exception e) {
+ LOG.warn("Failed log progress while closing, ignoring and continuing shutdown. Message={}",
+ e.getMessage());
+ }
// Notify and interrupt the waiting scheduler thread
synchronized (this) {
@@ -450,12 +455,28 @@ class ShuffleScheduler {
// Interrupt the fetchers.
for (FetcherOrderedGrouped fetcher : runningFetchers) {
- fetcher.shutDown();
+ try {
+ fetcher.shutDown();
+ } catch (Exception e) {
+ LOG.warn(
+ "Error while shutting down fetcher. Ignoring and continuing shutdown. Message={}",
+ e.getMessage());
+ }
}
// Kill the Referee thread.
- referee.interrupt();
- referee.join();
+ try {
+ referee.interrupt();
+ referee.join();
+ } catch (InterruptedException e) {
+ LOG.warn(
+ "Interrupted while shutting down referee. Ignoring and continuing shutdown");
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.warn(
+ "Error while shutting down referee. Ignoring and continuing shutdown. Message={}",
+ e.getMessage());
+ }
}
} finally {
long startTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 5b6c59f..31da4d0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -881,11 +881,7 @@ public class TestShuffleScheduler {
// Close the scheduler on different thread to trigger interrupt
Thread thread = new Thread(new Runnable() {
@Override public void run() {
- try {
scheduler.close();
- } catch (InterruptedException e) {
- //ignore
- }
}
});
thread.start();