You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2020/05/27 00:13:53 UTC
[tez] branch master updated: TEZ-4087 : Shuffle: Fix shuffle
cleanup to prevent thread leaks (Rajesh Balamohan via Prasanth J,
Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 7659726 TEZ-4087 : Shuffle: Fix shuffle cleanup to prevent thread leaks (Rajesh Balamohan via Prasanth J, Ashutosh Chauhan)
7659726 is described below
commit 7659726a1ed877d1f5303fc3673e4399bab33b65
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Tue May 26 17:13:00 2020 -0700
TEZ-4087 : Shuffle: Fix shuffle cleanup to prevent thread leaks (Rajesh Balamohan via Prasanth J, Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../shuffle/orderedgrouped/MergeManager.java | 23 +++++++++++++++++-----
.../common/shuffle/orderedgrouped/MergeThread.java | 18 +++++++++++++++--
.../shuffle/orderedgrouped/ShuffleScheduler.java | 1 +
3 files changed, 35 insertions(+), 7 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 2e5cc20..70f9e55 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -314,6 +314,16 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
this.onDiskMerger = new OnDiskMerger(this);
}
+ void setupParentThread(Thread shuffleSchedulerThread) {
+ LOG.info("Setting merger's parent thread to "
+ + shuffleSchedulerThread.getName());
+ if (this.memToMemMerger != null) {
+ memToMemMerger.setParentThread(shuffleSchedulerThread);
+ }
+ this.inMemoryMerger.setParentThread(shuffleSchedulerThread);;
+ this.onDiskMerger.setParentThread(shuffleSchedulerThread);
+ }
+
@Private
void configureAndStart() {
if (this.memToMemMerger != null) {
@@ -714,7 +724,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
int mergeFactor) {
super(manager, mergeFactor, exceptionReporter);
setName("MemToMemMerger [" + TezUtilsInternal
- .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+ .cleanVertexName(inputContext.getSourceVertexName())
+ + "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}
@@ -831,8 +842,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
public InMemoryMerger(MergeManager manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName("MemtoDiskMerger [" + TezUtilsInternal
- .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+ setName("MemtoDiskMerger [" + TezUtilsInternal
+ .cleanVertexName(inputContext.getSourceVertexName())
+ + "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}
@@ -952,8 +964,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
public OnDiskMerger(MergeManager manager) {
super(manager, ioSortFactor, exceptionReporter);
- setName("DiskToDiskMerger [" + TezUtilsInternal
- .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+ setName("DiskToDiskMerger [" + TezUtilsInternal
+ .cleanVertexName(inputContext.getSourceVertexName())
+ + "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
index 52b4c5b..c0af90f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
@@ -36,6 +36,8 @@ abstract class MergeThread<T> extends Thread {
private final ExceptionReporter reporter;
private boolean closed = false;
private final int mergeFactor;
+
+ private Thread shuffleSchedulerThread;
public MergeThread(MergeManager manager, int mergeFactor,
ExceptionReporter reporter) {
@@ -60,6 +62,10 @@ abstract class MergeThread<T> extends Thread {
}
}
+ public void setParentThread(Thread shuffleSchedulerThread) {
+ this.shuffleSchedulerThread = shuffleSchedulerThread;
+ }
+
public synchronized boolean isInProgress() {
return inProgress;
}
@@ -81,7 +87,11 @@ abstract class MergeThread<T> extends Thread {
public synchronized void waitForMerge() throws InterruptedException {
while (inProgress) {
- wait();
+ if (shuffleSchedulerThread != null
+ && !shuffleSchedulerThread.isAlive()) {
+ return;
+ }
+ wait(5000);
}
}
@@ -91,7 +101,11 @@ abstract class MergeThread<T> extends Thread {
// Wait for notification to start the merge...
synchronized (this) {
while (!inProgress) {
- wait();
+ if (shuffleSchedulerThread != null
+ && !shuffleSchedulerThread.isAlive()) {
+ return;
+ }
+ wait(5000);
}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index ff07e91..0954a76 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -445,6 +445,7 @@ class ShuffleScheduler {
public void start() throws Exception {
shuffleSchedulerThread = Thread.currentThread();
+ mergeManager.setupParentThread(shuffleSchedulerThread);
ShuffleSchedulerCallable schedulerCallable = new ShuffleSchedulerCallable();
schedulerCallable.call();
}