You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/11/12 17:13:55 UTC

[spark] branch branch-2.4 updated: [SPARK-29850][SQL] sort-merge-join an empty table should not memory leak

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 7459353  [SPARK-29850][SQL] sort-merge-join an empty table should not memory leak
7459353 is described below

commit 74593534daeb6cabd54dc3362c22e27778ea0b9d
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Nov 13 01:00:30 2019 +0800

    [SPARK-29850][SQL] sort-merge-join an empty table should not memory leak
    
    When whole stage codegen `HashAggregateExec`, create the hash map when we begin to process inputs.
    
    Sort-merge join completes directly if the left side table is empty. If there is an aggregate in the right side, the aggregate will not be triggered at all, but its hash map is created during codegen and can't be released.
    
    No
    
    a new test
    
    Closes #26471 from cloud-fan/memory.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 414cade01112bc86a9e66a5928399dc78495b6e4)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/aggregate/HashAggregateExec.scala    | 34 +++++++++++++---------
 .../scala/org/apache/spark/sql/JoinSuite.scala     |  9 ++++++
 2 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 3838777..794b0b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -591,9 +591,9 @@ case class HashAggregateExec(
 
     val thisPlan = ctx.addReferenceObj("plan", this)
 
-    // Create a name for the iterator from the fast hash map.
-    val iterTermForFastHashMap = if (isFastHashMapEnabled) {
-      // Generates the fast hash map class and creates the fash hash map term.
+    // Create a name for the iterator from the fast hash map, and the code to create fast hash map.
+    val (iterTermForFastHashMap, createFastHashMap) = if (isFastHashMapEnabled) {
+      // Generates the fast hash map class and creates the fast hash map term.
       val fastHashMapClassName = ctx.freshName("FastHashMap")
       if (isVectorizedHashMapEnabled) {
         val generatedMap = new VectorizedHashMapGenerator(ctx, aggregateExpressions,
@@ -601,25 +601,30 @@ case class HashAggregateExec(
         ctx.addInnerClass(generatedMap)
 
         // Inline mutable state since not many aggregation operations in a task
-        fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "vectorizedHastHashMap",
-          v => s"$v = new $fastHashMapClassName();", forceInline = true)
-        ctx.addMutableState(s"java.util.Iterator<InternalRow>", "vectorizedFastHashMapIter",
+        fastHashMapTerm = ctx.addMutableState(
+          fastHashMapClassName, "vectorizedFastHashMap", forceInline = true)
+        val iter = ctx.addMutableState(
+          "java.util.Iterator<InternalRow>",
+          "vectorizedFastHashMapIter",
           forceInline = true)
+        val create = s"$fastHashMapTerm = new $fastHashMapClassName();"
+        (iter, create)
       } else {
         val generatedMap = new RowBasedHashMapGenerator(ctx, aggregateExpressions,
           fastHashMapClassName, groupingKeySchema, bufferSchema, bitMaxCapacity).generate()
         ctx.addInnerClass(generatedMap)
 
         // Inline mutable state since not many aggregation operations in a task
-        fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "fastHashMap",
-          v => s"$v = new $fastHashMapClassName(" +
-            s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());",
-          forceInline = true)
-        ctx.addMutableState(
+        fastHashMapTerm = ctx.addMutableState(
+          fastHashMapClassName, "fastHashMap", forceInline = true)
+        val iter = ctx.addMutableState(
           "org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>",
           "fastHashMapIter", forceInline = true)
+        val create = s"$fastHashMapTerm = new $fastHashMapClassName(" +
+          s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());"
+        (iter, create)
       }
-    }
+    } else ("", "")
 
     // Create a name for the iterator from the regular hash map.
     // Inline mutable state since not many aggregation operations in a task
@@ -627,8 +632,7 @@ case class HashAggregateExec(
       "mapIter", forceInline = true)
     // create hashMap
     val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
-    hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap",
-      v => s"$v = $thisPlan.createHashMap();", forceInline = true)
+    hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap", forceInline = true)
     sorterTerm = ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, "sorter",
       forceInline = true)
 
@@ -728,6 +732,8 @@ case class HashAggregateExec(
     s"""
      if (!$initAgg) {
        $initAgg = true;
+       $createFastHashMap
+       $hashMapTerm = $thisPlan.createHashMap();
        long $beforeAgg = System.nanoTime();
        $doAggFuncName();
        $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index d159a8b..02a5c4a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -958,4 +958,13 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       checkAnswer(res, Row(0, 0, 0))
     }
   }
+
+  test("SPARK-29850: sort-merge-join an empty table should not memory leak") {
+    val df1 = spark.range(10).select($"id", $"id" % 3 as 'p)
+      .repartition($"id").groupBy($"id").agg(Map("p" -> "max"))
+    val df2 = spark.range(0)
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      assert(df2.join(df1, "id").collect().isEmpty)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org