You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2024/03/06 02:36:14 UTC

(spark) branch branch-3.4 updated: [SPARK-47146][CORE][3.5] Possible thread leak when doing sort merge join

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 6ebfadf29bb6 [SPARK-47146][CORE][3.5] Possible thread leak when doing sort merge join
6ebfadf29bb6 is described below

commit 6ebfadf29bb68274127a2e06b4f6e8314ac4ef13
Author: JacobZheng0927 <zs...@163.com>
AuthorDate: Tue Mar 5 20:35:42 2024 -0600

    [SPARK-47146][CORE][3.5] Possible thread leak when doing sort merge join
    
    This pr backport https://github.com/apache/spark/pull/45327 to branch-3.5
    
    ### What changes were proposed in this pull request? Add TaskCompletionListener to close inputStream to avoid thread leakage caused by unclosed ReadAheadInputStream.
    
    ### Why are the changes needed?
    To fix the issue SPARK-47146
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45390 from JacobZheng0927/SPARK-47146-3.5.
    
    Authored-by: JacobZheng0927 <zs...@163.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit e9f7d36797c4344295556463da16f891bb96d8ac)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../unsafe/sort/UnsafeSorterSpillReader.java       | 12 ++++++++
 .../scala/org/apache/spark/sql/JoinSuite.scala     | 33 +++++++++++++++++++++-
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index db79efd00853..8bd44c8c52c1 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -28,6 +28,8 @@ import org.apache.spark.io.ReadAheadInputStream;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.unsafe.Platform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 
@@ -36,6 +38,7 @@ import java.io.*;
  * of the file format).
  */
 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
+  private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
   public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
 
   private InputStream in;
@@ -82,6 +85,15 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
       Closeables.close(bs, /* swallowIOException = */ true);
       throw e;
     }
+    if (taskContext != null) {
+      taskContext.addTaskCompletionListener(context -> {
+        try {
+          close();
+        } catch (IOException e) {
+          logger.info("error while closing UnsafeSorterSpillReader", e);
+        }
+      });
+    }
   }
 
   @Override
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 5125708be32d..7f062bfb899c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer
 import org.mockito.Mockito._
 
 import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
+import org.apache.spark.internal.config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder}
@@ -35,7 +36,7 @@ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExch
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.python.BatchEvalPythonExec
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.tags.SlowSQLTest
 
@@ -1591,3 +1592,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 }
+
+class ThreadLeakInSortMergeJoinSuite
+  extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+
+  setupTestData()
+  override protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    new TestSparkSession(
+      sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+  }
+
+  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+      assertSpilled(sparkContext, "inner join") {
+        sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+      }
+
+      val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+        .find {
+          _.getName.startsWith("read-ahead")
+        }
+      assert(readAheadThread.isEmpty)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org