You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/18 13:07:47 UTC

[GitHub] [spark] xuanyuanking opened a new pull request #26164: [SPARK-21492][SQL] Fix memory leak in SortMergeJoin

xuanyuanking opened a new pull request #26164: [SPARK-21492][SQL] Fix memory leak in SortMergeJoin
URL: https://github.com/apache/spark/pull/26164
 
 
   ### What changes were proposed in this pull request?
   We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
   - Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
   - Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.
   - Add a conf `spark.sql.sortMergeJoinExec.eagerCleanupResources` to control this behavior for safety, default value is true.
   
   ### Why are the changes needed?
   Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.
   
   ### Does this PR introduce any user-facing change?
   No.
   
   ### How was this patch tested?
   UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.
   
   Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. Set `spark.sql.sortMergeJoinExec.eagerCleanupResources=fasle` the below test(thanks @taosaildrone for providing this test  [here](https://github.com/apache/spark/pull/23762#issuecomment-463303175)) will fail by OOM, while open the conf it'll pass.
   
   ```
   from pyspark.sql.functions import rand, col
   
   spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   # spark.conf.set("spark.sql.sortMergeJoinExec.eagerCleanupResources", "true")
   
   r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
   r1 = r1.withColumn('value', rand())
   r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
   r2 = r2.withColumn('value2', rand())
   joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
   joined = joined.coalesce(1)
   joined.explain()
   joined.show()
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org