You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2020/08/17 06:04:00 UTC

[jira] [Comment Edited] (SPARK-32628) Use bloom filter to improve dynamicPartitionPruning

    [ https://issues.apache.org/jira/browse/SPARK-32628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178511#comment-17178511 ] 

Yuming Wang edited comment on SPARK-32628 at 8/17/20, 6:03 AM:
---------------------------------------------------------------

Benchmark:
{code:scala}
spark.range(20000000000L)
  .select(col("id"), col("id").%(2000).as("k"))
  .write
  .partitionBy("k")
  .mode("overwrite")
  .saveAsTable("df1")

spark.range(2000000000L)
  .select(col("id"), col("id").as("k"))
  .write
  .mode("overwrite")
  .saveAsTable("df2")

spark.sql("CREATE TABLE t_result1 USING parquet as SELECT df1.id, df2.k FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id > 1500 AND df2.id < 1000000000L")

{code}
 
|Disable DPP(Second)|Enable DPP and disable reuseBroadcastOnly beore this patch|Enable DPP and disable reuseBroadcastOnly after this patch(Second)|
|222|SparkException|108|


{noformat}
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 107 tasks (1032.7 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:312)
	... 110 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 107 tasks (1032.7 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
{noformat}



was (Author: q79969786):
Benchmark:

{code:scala}
spark.range(20000000000L)
  .select(col("id"), col("id").%(2000).as("k"))
  .write
  .partitionBy("k")
  .mode("overwrite")
  .saveAsTable("df1")

spark.range(2000000000L)
  .select(col("id"), col("id").as("k"))
  .write
  .mode("overwrite")
  .saveAsTable("df2")

spark.sql("CREATE TABLE t_result1 USING parquet as SELECT df1.id, df2.k FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id > 1500 AND df2.id < 1000000000L")

{code}


> Use bloom filter to improve dynamicPartitionPruning
> ---------------------------------------------------
>
>                 Key: SPARK-32628
>                 URL: https://issues.apache.org/jira/browse/SPARK-32628
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> It will throw exception when {{spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly}} is disabled:
> {code:sql}
> select catalog_sales.* from  catalog_sales join catalog_returns  where cr_order_number = cs_sold_date_sk and cr_returned_time_sk < 40000;
> {code}
> {noformat}
> 20/08/16 06:44:42 ERROR TaskSetManager: Total size of serialized results of 494 tasks (1225.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
> {noformat}
> We can improve it with minimum, maximum and Bloom filter to reduce serialized results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org