You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohamadreza Rostami <mo...@gmail.com> on 2021/12/04 14:41:33 UTC

[Spark CORE][Spark SQL][Advanced]: Why dynamic partition pruning optimization does not work in this scenario?

Hello all,

We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet format. To speed-up our querys, we trying diffrent scenarios. We find out that Spark support dynamic partition pruning in versions after 3.0.0 . So, to test the improvment of DPP feature we defined two tables sales and products and a query. You can find the codes that initialize the envieonment here:
# First Run
val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8))
val productSeq = Seq((1, "A"), (2, "B"))
sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet")
sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet”)

Then we run an other scala code to run the query. you can find the second run file here:
# Second Run
val salesDF = spark.read.parquet("hdfs://test/spark-optimization/sales.parquet")
val productDF = spark.read.parquet("hdfs://test/spark-optimization/sales.parquet")
salesDF.createOrReplaceTempView("sales")
productDF.createOrReplaceTempView("products")
sql("SELECT * FROM sales JOIN products ON sales.pId = products.id and products.name = 'A'").explain()

Based on the DPP feature, we expect filters pushed down to file scan level that prevents reading unnecessary partitions. See the following picture:



But instead, Spark does not push down filters to the file scan layer and uses broadcast join without filtering partitions. See the following picture:



To better understand the situation, please have looked at this link. (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 <https://dzone.com/articles/dynamic-partition-pruning-in-spark-30>)
We checked the DPP and adaptive query features are enabled in our spark cluster. So my question is, How can I debug and find the root cause of this problem?


Cheers,

Re: [Spark CORE][Spark SQL][Advanced]: Why dynamic partition pruning optimization does not work in this scenario?

Posted by Mohamadreza Rostami <mo...@gmail.com>.
Thank you for your response. It was a good point "under the broadcast join threshold." We test it on real data sets with tables size TBs, but instead, Spark uses merge sort join without DPP. Anyway, you said that the DPP is not implemented for broadcast joins? So, I wonder how DPP can be beneficial without broadcasting index tables(the table containing dynamic filters.)? Because the implementation that I have in my mind from DPP is something like:
1. Spark read the dynamic filters from index tables.
2. Broadcast dynamic filters on all nodes
3. executers push down dynamic filter to file scan layer and run the query

Thank you so much for your attention and participation.

> On Dec 4, 2021, at 20:44, Russell Spitzer <ru...@gmail.com> wrote:
> 
> This is probably because your data size is well under the broadcastJoin threshold so at the planning phase it decides to do a BroadcastJoin instead of a Join which could take advantage of dynamic partition pruning. For testing like this you can always disable that with spark.sql.autoBroadcastJoinThreshold=-1
> 
> In a real data scenario the size of the join tables would probably be much larger than the default (10mb) and trigger a dynamic partition pruning although I can see it may be beneficial to implement dynamic partition pruning for broadcast joins as well...
> 
> 
>> On Dec 4, 2021, at 8:41 AM, Mohamadreza Rostami <mohamadrezarostami2@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello all,
>> 
>> We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet format. To speed-up our querys, we trying diffrent scenarios. We find out that Spark support dynamic partition pruning in versions after 3.0.0 . So, to test the improvment of DPP feature we defined two tables sales and products and a query. You can find the codes that initialize the envieonment here:
>> # First Run
>> val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8))
>> val productSeq = Seq((1, "A"), (2, "B"))
>> sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet <hdfs://test/spark-optimization/sales.parquet>")
>> sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet <hdfs://test/spark-optimization/products.parquet>”)
>> 
>> Then we run an other scala code to run the query. you can find the second run file here:
>> # Second Run
>> val salesDF = spark.read.parquet("hdfs://test/spark-optimization/sales.parquet <hdfs://test/spark-optimization/sales.parquet>")
>> val productDF = spark.read.parquet("hdfs://test/spark-optimization/sales.parquet <hdfs://test/spark-optimization/sales.parquet>")
>> salesDF.createOrReplaceTempView("sales")
>> productDF.createOrReplaceTempView("products")
>> sql("SELECT * FROM sales JOIN products ON sales.pId = products.id <http://products.id/> and products.name = 'A'").explain()
>> 
>> Based on the DPP feature, we expect filters pushed down to file scan level that prevents reading unnecessary partitions. See the following picture:
>> <13551396-1591032620843.png>
>> 
>> 
>> But instead, Spark does not push down filters to the file scan layer and uses broadcast join without filtering partitions. See the following picture:
>> <13551394-1591032607773.png>
>> 
>> 
>> To better understand the situation, please have looked at this link. (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 <https://dzone.com/articles/dynamic-partition-pruning-in-spark-30>)
>> We checked the DPP and adaptive query features are enabled in our spark cluster. So my question is, How can I debug and find the root cause of this problem?
>> 
>> 
>> Cheers,
> 


Re: [Spark CORE][Spark SQL][Advanced]: Why dynamic partition pruning optimization does not work in this scenario?

Posted by Russell Spitzer <ru...@gmail.com>.
This is probably because your data size is well under the broadcastJoin threshold so at the planning phase it decides to do a BroadcastJoin instead of a Join which could take advantage of dynamic partition pruning. For testing like this you can always disable that with spark.sql.autoBroadcastJoinThreshold=-1

In a real data scenario the size of the join tables would probably be much larger than the default (10mb) and trigger a dynamic partition pruning although I can see it may be beneficial to implement dynamic partition pruning for broadcast joins as well...


> On Dec 4, 2021, at 8:41 AM, Mohamadreza Rostami <mo...@gmail.com> wrote:
> 
> Hello all,
> 
> We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet format. To speed-up our querys, we trying diffrent scenarios. We find out that Spark support dynamic partition pruning in versions after 3.0.0 . So, to test the improvment of DPP feature we defined two tables sales and products and a query. You can find the codes that initialize the envieonment here:
> # First Run
> val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8))
> val productSeq = Seq((1, "A"), (2, "B"))
> sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet <hdfs://test/spark-optimization/sales.parquet>")
> sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet <hdfs://test/spark-optimization/products.parquet>”)
> 
> Then we run an other scala code to run the query. you can find the second run file here:
> # Second Run
> val salesDF = spark.read.parquet("hdfs://test/spark-optimization/sales.parquet <hdfs://test/spark-optimization/sales.parquet>")
> val productDF = spark.read.parquet("hdfs://test/spark-optimization/sales.parquet <hdfs://test/spark-optimization/sales.parquet>")
> salesDF.createOrReplaceTempView("sales")
> productDF.createOrReplaceTempView("products")
> sql("SELECT * FROM sales JOIN products ON sales.pId = products.id <http://products.id/> and products.name = 'A'").explain()
> 
> Based on the DPP feature, we expect filters pushed down to file scan level that prevents reading unnecessary partitions. See the following picture:
> <13551396-1591032620843.png>
> 
> 
> But instead, Spark does not push down filters to the file scan layer and uses broadcast join without filtering partitions. See the following picture:
> <13551394-1591032607773.png>
> 
> 
> To better understand the situation, please have looked at this link. (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 <https://dzone.com/articles/dynamic-partition-pruning-in-spark-30>)
> We checked the DPP and adaptive query features are enabled in our spark cluster. So my question is, How can I debug and find the root cause of this problem?
> 
> 
> Cheers,