You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "JacobZheng0927 (via GitHub)" <gi...@apache.org> on 2024/02/29 02:31:08 UTC

[PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

JacobZheng0927 opened a new pull request, #45327:
URL: https://github.com/apache/spark/pull/45327

   ### What changes were proposed in this pull request?
   Modify the implementation of newDaemonSingleThreadExecutor in ThreadUtils to automatically close the thread pool using FinalizableDelegatedExecutorService.
   
   
   ### Why are the changes needed?
   SPARK-39591 modified the implementation of newDaemonSingleThreadExecutor, which can lead to thread leaks in some scenarios where newDaemonSingleThreadExecutor is used and the thread pool is not actively closed. For example, sort merge join in the presence of a spill.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Unit test
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1970413864

   Also, +CC @HeartSaVioR who reviewed the previous PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1970433417

   > Thanks for surfacing this issue !
   > 
   > To fix this, I would suggest to change `ReadAheadInputStream` to take `TaskContext` as a parameter - and add a task completion listener - which calls `close`.
   > 
   > That would fix the issue, more proactively release resources (instead of waiting for finalization to kick in) and also future proof this code (when support for finalize is removed from JDK)
   
   
   
   > To fix this, I would suggest to change `ReadAheadInputStream` to take `TaskContext` as a parameter - and add a task completion listener - which calls `close`.
   
   Thanks, that's a very good point. I will try this modification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1509964794


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()

Review Comment:
   Sounds good, does that happen here though ? What I am trying to make sure is - does this test actually fail in master and work after this PR ? (not at desktop, so cant run the test against master to check :-) )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1507120685


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,34 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+        System.gc()
+      }
+
+      val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+        .find {
+          _.getName.startsWith("read-ahead")
+        }
+      assert(readAheadThread.isEmpty)

Review Comment:
   I am not sure if this test can end up becoming flakey - in case some other tests result in read-ahead thread hanging around (for example, if some previous tests task is still running).
   
   +CC @dongjoon-hyun for thoughts on how to test this more robustly (I am wondering if this will end up in a similar flakeyness with the other memory manager PR recently, where tests were interacting).



##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java:
##########
@@ -82,6 +85,14 @@ public UnsafeSorterSpillReader(
       Closeables.close(bs, /* swallowIOException = */ true);
       throw e;
     }
+    taskContext.addTaskCompletionListener(context -> {
+      try {
+        close();
+      } catch (IOException e) {
+        logger.error("error while closing UnsafeSorterSpillReader", e);
+        throw new RuntimeException(e);

Review Comment:
   We can log in `info` and drop the exception - for 'normal' flow, that is what ends up happening when close throws exception.



##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,34 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+        System.gc()

Review Comment:
   We dont need the `gc` anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1977986879

   @JacobZheng0927, might be a good idea to backport this to 3.5 as well - will you be able to create a backport PR ?
   (I ran into some issue locally when trying to merge to branch-3.5 and bailed)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1508908677


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()

Review Comment:
   As an example, sort merge join performs an inner join with the keys [1, 2, 3, 4, 5, 6] and [1, 2, 3] on the left and right. The left and right iterators will take turns advancing. The right iterator will hit the end first, and there's no point in advancing the left iterator. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1507995571


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,34 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+        System.gc()
+      }
+
+      val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+        .find {
+          _.getName.startsWith("read-ahead")
+        }
+      assert(readAheadThread.isEmpty)

Review Comment:
   I was referring [to this PR](https://github.com/apache/spark/pull/45052#discussion_r1501924992) @dongjoon-hyun - the  RC for that was some other previous test task interfering the the current (due to delays in killing tasks).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1508404458


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java:
##########
@@ -82,6 +85,13 @@ public UnsafeSorterSpillReader(
       Closeables.close(bs, /* swallowIOException = */ true);
       throw e;
     }
+    taskContext.addTaskCompletionListener(context -> {
+      try {
+        close();
+      } catch (IOException e) {

Review Comment:
   I don't think there should be a leak, executionService.shutdownNow() will always be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1514197639


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java:
##########
@@ -36,6 +38,7 @@
  * of the file format).
  */
 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
+  private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);

Review Comment:
   It was careless of me, thank you very much!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1977985641

   Merged to master.
   Thanks for fixing this @JacobZheng0927 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1507888706


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,34 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+        System.gc()
+      }
+
+      val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+        .find {
+          _.getName.startsWith("read-ahead")
+        }
+      assert(readAheadThread.isEmpty)

Review Comment:
   I'm not aware of any flakiness about `JoinSuite` in these days.



##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,34 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+        System.gc()
+      }
+
+      val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+        .find {
+          _.getName.startsWith("read-ahead")
+        }
+      assert(readAheadThread.isEmpty)

Review Comment:
   I'm not aware of any flakiness about `JoinSuite` in these days, @mridulm .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1971549366

   Hi, @viirya . Did you see any thread leaks while you are working on Join (or shuffle)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1506931887


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala:
##########
@@ -35,8 +38,11 @@ trait AsyncLogPurge extends Logging {
 
   protected val sparkSession: SparkSession
 
-  private val asyncPurgeExecutorService
-    = ThreadUtils.newDaemonSingleThreadExecutor("async-log-purge")
+  private val asyncPurgeExecutorService: ThreadPoolExecutor = {
+    val threadFactory =
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("async-log-purge").build()

Review Comment:
   How about adding an additional function to ThreadUtils?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1514197639


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java:
##########
@@ -36,6 +38,7 @@
  * of the file format).
  */
 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
+  private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);

Review Comment:
   It was careless of me, thank you very much! I'll fix it right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1509977846


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()

Review Comment:
   Yes, this test will fail in master and pass after this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1513264898


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,34 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+        System.gc()
+      }
+
+      val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+        .find {
+          _.getName.startsWith("read-ahead")
+        }
+      assert(readAheadThread.isEmpty)

Review Comment:
   Thanks for confirming, that was my assessment as well - but wanted to double check given we faced it very recently :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1970412770

   Thanks for surfacing this issue !
   
   To fix this, I would suggest to change `ReadAheadInputStream` to take `TaskContext` as a parameter - and add a task completion listener - which calls `close`.
   
   That would fix the issue, more proactively release resources (instead of waiting for finalization to kick in) and also future proof this code (when support for finalize is removed from JDK) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1507892803


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java:
##########
@@ -82,6 +85,13 @@ public UnsafeSorterSpillReader(
       Closeables.close(bs, /* swallowIOException = */ true);
       throw e;
     }
+    taskContext.addTaskCompletionListener(context -> {
+      try {
+        close();
+      } catch (IOException e) {

Review Comment:
   For my understanding, what happens if we fail to close here? `Thread Leak`s still happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1508399568


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()

Review Comment:
   No, this test fails in master because it doesn't read the iterator completely so there is a thread leak



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1507996728


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()

Review Comment:
   This test will work in current master as well, right ? Since it reads the iterator entirely and so will get closed ?
   The test should be failing in master and work with this fix @JacobZheng0927 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #45327: [SPARK-47146][CORE] Possible thread leak when doing sort merge join
URL: https://github.com/apache/spark/pull/45327


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1513080294


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,34 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+        System.gc()
+      }
+
+      val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+        .find {
+          _.getName.startsWith("read-ahead")
+        }
+      assert(readAheadThread.isEmpty)

Review Comment:
   @mridulm I guess this case should be fine. In the memory manager PR, the `JobCancellationSuite` specifically set some tasks to sleep for certain amount of time which causes them to not finish even if the test itself is finished. I don't see similar pattern in the `JoinSuite`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1971599378

   > Hi, @viirya . Did you see any thread leaks while you are working on Join (or shuffle)?
   
   Hi, @dongjoon-hyun. No, I've not seen it. Maybe it is because our tests are not long running ones that have no chance to show any symptom of noticeable leak.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1508880886


##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()

Review Comment:
   Can you explain why it wont read the iterator fully ? The `collect` should drop it entirely ?



##########
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##########
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()

Review Comment:
   Can you explain why it wont read the iterator fully ? The `collect` should drain it entirely ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1506923273


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala:
##########
@@ -17,13 +17,16 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.util.concurrent.atomic.AtomicBoolean
+import com.google.common.util.concurrent.ThreadFactoryBuilder
 
+import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.ThreadUtils
 
+import java.util.concurrent.{Executors, ThreadPoolExecutor}

Review Comment:
   Scala style error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "JacobZheng0927 (via GitHub)" <gi...@apache.org>.
JacobZheng0927 commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1978625585

   > @JacobZheng0927, might be a good idea to backport this to 3.5 as well - will you be able to create a backport PR ? (I ran into some issue locally when trying to merge to branch-3.5 and bailed)
   
   I created this pr https://github.com/apache/spark/pull/45390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #45327:
URL: https://github.com/apache/spark/pull/45327#issuecomment-1979101387

   > Any thoughts https://github.com/apache/spark/pull/45327#discussion_r1507120685 @dongjoon-hyun , @sunchao ?
   
   Sorry for the delay @mridulm . Posted my reply. This PR LGTM too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47146][CORE] Possible thread leak when doing sort merge join [spark]

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on code in PR #45327:
URL: https://github.com/apache/spark/pull/45327#discussion_r1513846126


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java:
##########
@@ -36,6 +38,7 @@
  * of the file format).
  */
 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
+  private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);

Review Comment:
   is the logger's name correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org