You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/11/11 18:10:20 UTC

tez git commit: TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master a93dbf0b2 -> 0d5984426


TEZ-3534. Differentiate thread names on Fetchers, minor changes to
shuffle shutdown code. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0d598442
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0d598442
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0d598442

Branch: refs/heads/master
Commit: 0d598442640ff3cb6ca52f2077bfddb62b54e628
Parents: a93dbf0
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Nov 11 10:11:26 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Nov 11 10:11:26 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../common/shuffle/impl/ShuffleManager.java     | 10 ++++--
 .../common/shuffle/orderedgrouped/Shuffle.java  |  8 ++++-
 .../orderedgrouped/ShuffleScheduler.java        | 33 ++++++++++++++++----
 .../orderedgrouped/TestShuffleScheduler.java    |  4 ---
 5 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af83c73..8128c7b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.
   TEZ-3491. Tez job can hang due to container priority inversion.
   TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
   TEZ-3477. MRInputHelpers generateInputSplitsToMem public API modified
@@ -146,6 +147,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.
   TEZ-3491. Tez job can hang due to container priority inversion.
   TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
   TEZ-3493. DAG submit timeout cannot be set to a month

http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 1ebd3a4..d034b2e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -234,7 +234,7 @@ public class ShuffleManager implements FetcherCallback {
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
         numFetchers,
         new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
+            .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
     
     ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
@@ -812,7 +812,13 @@ public class ShuffleManager implements FetcherCallback {
       try {
         wakeLoop.signal(); // signal the fetch-scheduler
         for (Fetcher fetcher : runningFetchers) {
-          fetcher.shutdown(); // This could be parallelized.
+          try {
+            fetcher.shutdown(); // This could be parallelized.
+          } catch (Exception e) {
+            LOG.warn(
+                "Error while stopping fetcher during shutdown. Ignoring and continuing. Message={}",
+                e.getMessage());
+          }
         }
       } finally {
         lock.unlock();

http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 5a18959..e5f4e5c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -378,7 +378,13 @@ public class Shuffle implements ExceptionReporter {
       if (eventHandler != null) {
         eventHandler.logProgress(true);
       }
-      cleanupShuffleSchedulerIgnoreErrors();
+      try {
+        cleanupShuffleSchedulerIgnoreErrors();
+      } catch (Exception e) {
+        LOG.warn(
+            "Error cleaning up shuffle scheduler. Ignoring and continuing with shutdown. Message={}",
+            e.getMessage());
+      }
       cleanupMerger(true);
     } catch (Throwable t) {
       LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t);

http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 09518e5..3d2c1ad 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -376,7 +376,7 @@ class ShuffleScheduler {
 
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
         new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
+            .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
 
     this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
@@ -429,10 +429,15 @@ class ShuffleScheduler {
     schedulerCallable.call();
   }
 
-  public void close() throws InterruptedException {
+  public void close() {
     try {
       if (!isShutdown.getAndSet(true)) {
-        logProgress();
+        try {
+          logProgress();
+        } catch (Exception e) {
+          LOG.warn("Failed log progress while closing, ignoring and continuing shutdown. Message={}",
+              e.getMessage());
+        }
 
         // Notify and interrupt the waiting scheduler thread
         synchronized (this) {
@@ -450,12 +455,28 @@ class ShuffleScheduler {
 
         // Interrupt the fetchers.
         for (FetcherOrderedGrouped fetcher : runningFetchers) {
-          fetcher.shutDown();
+          try {
+            fetcher.shutDown();
+          } catch (Exception e) {
+            LOG.warn(
+                "Error while shutting down fetcher. Ignoring and continuing shutdown. Message={}",
+                e.getMessage());
+          }
         }
 
         // Kill the Referee thread.
-        referee.interrupt();
-        referee.join();
+        try {
+          referee.interrupt();
+          referee.join();
+        } catch (InterruptedException e) {
+          LOG.warn(
+              "Interrupted while shutting down referee. Ignoring and continuing shutdown");
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          LOG.warn(
+              "Error while shutting down referee. Ignoring and continuing shutdown. Message={}",
+              e.getMessage());
+        }
       }
     } finally {
       long startTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/tez/blob/0d598442/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 5b6c59f..31da4d0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -881,11 +881,7 @@ public class TestShuffleScheduler {
       // Close the scheduler on different thread to trigger interrupt
       Thread thread = new Thread(new Runnable() {
         @Override public void run() {
-          try {
             scheduler.close();
-          } catch (InterruptedException e) {
-            //ignore
-          }
         }
       });
       thread.start();