You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2020/08/17 06:04:00 UTC
[jira] [Comment Edited] (SPARK-32628) Use bloom filter to improve
dynamicPartitionPruning
[ https://issues.apache.org/jira/browse/SPARK-32628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178511#comment-17178511 ]
Yuming Wang edited comment on SPARK-32628 at 8/17/20, 6:03 AM:
---------------------------------------------------------------
Benchmark:
{code:scala}
spark.range(20000000000L)
.select(col("id"), col("id").%(2000).as("k"))
.write
.partitionBy("k")
.mode("overwrite")
.saveAsTable("df1")
spark.range(2000000000L)
.select(col("id"), col("id").as("k"))
.write
.mode("overwrite")
.saveAsTable("df2")
spark.sql("CREATE TABLE t_result1 USING parquet as SELECT df1.id, df2.k FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id > 1500 AND df2.id < 1000000000L")
{code}
|Disable DPP(Second)|Enable DPP and disable reuseBroadcastOnly beore this patch|Enable DPP and disable reuseBroadcastOnly after this patch(Second)|
|222|SparkException|108|
{noformat}
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 107 tasks (1032.7 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:312)
... 110 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 107 tasks (1032.7 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
{noformat}
was (Author: q79969786):
Benchmark:
{code:scala}
spark.range(20000000000L)
.select(col("id"), col("id").%(2000).as("k"))
.write
.partitionBy("k")
.mode("overwrite")
.saveAsTable("df1")
spark.range(2000000000L)
.select(col("id"), col("id").as("k"))
.write
.mode("overwrite")
.saveAsTable("df2")
spark.sql("CREATE TABLE t_result1 USING parquet as SELECT df1.id, df2.k FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id > 1500 AND df2.id < 1000000000L")
{code}
> Use bloom filter to improve dynamicPartitionPruning
> ---------------------------------------------------
>
> Key: SPARK-32628
> URL: https://issues.apache.org/jira/browse/SPARK-32628
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Yuming Wang
> Priority: Major
>
> It will throw exception when {{spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly}} is disabled:
> {code:sql}
> select catalog_sales.* from catalog_sales join catalog_returns where cr_order_number = cs_sold_date_sk and cr_returned_time_sk < 40000;
> {code}
> {noformat}
> 20/08/16 06:44:42 ERROR TaskSetManager: Total size of serialized results of 494 tasks (1225.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
> {noformat}
> We can improve it with minimum, maximum and Bloom filter to reduce serialized results.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org