You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2020/05/27 00:13:53 UTC

[tez] branch master updated: TEZ-4087 : Shuffle: Fix shuffle cleanup to prevent thread leaks (Rajesh Balamohan via Prasanth J, Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 7659726  TEZ-4087 : Shuffle: Fix shuffle cleanup to prevent thread leaks (Rajesh Balamohan via Prasanth J, Ashutosh Chauhan)
7659726 is described below

commit 7659726a1ed877d1f5303fc3673e4399bab33b65
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Tue May 26 17:13:00 2020 -0700

    TEZ-4087 : Shuffle: Fix shuffle cleanup to prevent thread leaks (Rajesh Balamohan via Prasanth J, Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../shuffle/orderedgrouped/MergeManager.java       | 23 +++++++++++++++++-----
 .../common/shuffle/orderedgrouped/MergeThread.java | 18 +++++++++++++++--
 .../shuffle/orderedgrouped/ShuffleScheduler.java   |  1 +
 3 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 2e5cc20..70f9e55 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -314,6 +314,16 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       this.onDiskMerger = new OnDiskMerger(this);
   }
 
+  void setupParentThread(Thread shuffleSchedulerThread) {
+    LOG.info("Setting merger's parent thread to "
+        + shuffleSchedulerThread.getName());
+    if (this.memToMemMerger != null) {
+      memToMemMerger.setParentThread(shuffleSchedulerThread);
+    }
+    this.inMemoryMerger.setParentThread(shuffleSchedulerThread);;
+    this.onDiskMerger.setParentThread(shuffleSchedulerThread);
+  }
+
   @Private
   void configureAndStart() {
     if (this.memToMemMerger != null) {
@@ -714,7 +724,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
                                             int mergeFactor) {
       super(manager, mergeFactor, exceptionReporter);
       setName("MemToMemMerger [" + TezUtilsInternal
-          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+          .cleanVertexName(inputContext.getSourceVertexName())
+          + "_" + inputContext.getUniqueIdentifier() + "]");
       setDaemon(true);
     }
 
@@ -831,8 +842,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
 
     public InMemoryMerger(MergeManager manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("MemtoDiskMerger [" + TezUtilsInternal
-          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setName("MemtoDiskMerger [" +  TezUtilsInternal
+          .cleanVertexName(inputContext.getSourceVertexName())
+          + "_" + inputContext.getUniqueIdentifier()  + "]");
       setDaemon(true);
     }
     
@@ -952,8 +964,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
 
     public OnDiskMerger(MergeManager manager) {
       super(manager, ioSortFactor, exceptionReporter);
-      setName("DiskToDiskMerger [" + TezUtilsInternal
-          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setName("DiskToDiskMerger [" +  TezUtilsInternal
+          .cleanVertexName(inputContext.getSourceVertexName())
+          + "_" + inputContext.getUniqueIdentifier() + "]");
       setDaemon(true);
     }
     
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
index 52b4c5b..c0af90f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
@@ -36,6 +36,8 @@ abstract class MergeThread<T> extends Thread {
   private final ExceptionReporter reporter;
   private boolean closed = false;
   private final int mergeFactor;
+
+  private Thread shuffleSchedulerThread;
   
   public MergeThread(MergeManager manager, int mergeFactor,
                      ExceptionReporter reporter) {
@@ -60,6 +62,10 @@ abstract class MergeThread<T> extends Thread {
     }
   }
 
+  public void setParentThread(Thread shuffleSchedulerThread) {
+    this.shuffleSchedulerThread = shuffleSchedulerThread;
+  }
+
   public synchronized boolean isInProgress() {
     return inProgress;
   }
@@ -81,7 +87,11 @@ abstract class MergeThread<T> extends Thread {
 
   public synchronized void waitForMerge() throws InterruptedException {
     while (inProgress) {
-      wait();
+      if (shuffleSchedulerThread != null
+          && !shuffleSchedulerThread.isAlive()) {
+        return;
+      }
+      wait(5000);
     }
   }
 
@@ -91,7 +101,11 @@ abstract class MergeThread<T> extends Thread {
         // Wait for notification to start the merge...
         synchronized (this) {
           while (!inProgress) {
-            wait();
+            if (shuffleSchedulerThread != null
+                && !shuffleSchedulerThread.isAlive()) {
+              return;
+            }
+            wait(5000);
           }
         }
 
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index ff07e91..0954a76 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -445,6 +445,7 @@ class ShuffleScheduler {
 
   public void start() throws Exception {
     shuffleSchedulerThread = Thread.currentThread();
+    mergeManager.setupParentThread(shuffleSchedulerThread);
     ShuffleSchedulerCallable schedulerCallable = new ShuffleSchedulerCallable();
     schedulerCallable.call();
   }