You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/07 18:18:25 UTC

[GitHub] [iceberg] dramaticlly opened a new issue, #5224: Missing DynamicFileFilter when use iceberg in Spark 3.2

dramaticlly opened a new issue, #5224:
URL: https://github.com/apache/iceberg/issues/5224

   Hey Iceberg Community:
   
   we recently migrated from using iceberg 13 with Spark 3.1 to Spark 3.2 and realized a some existing SQL delete job are producing a lot more shuffling data than it was in spark 3.1, when explain the SQL statement with logical plan, we realized the https://github.com/apache/iceberg/blob/master/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DynamicFileFilter.scala is missing from the Spark 3.2 extensions and want some help to understand why.
   
   Looks like dynamic file filter was introduced https://github.com/apache/iceberg/pull/3415/ on 10/31/2021 
   and initial spark 3.2 support was merged in https://github.com/apache/iceberg/pull/3335/files on 10/22/2021, so want to check if there's any time implication 
   
   delete SQL
   ```sql
   DELETE FROM $table1
   WHERE $table1.date <= '20211228' AND $table1.date >= '20220627'
   AND upper($table1.$column1) IN (SELECT * FROM $table2)
   ```
   
   Spark logic plan screenshot
   ![privacy-deletion-job_-_Details_for_Query_3_and_privacy-deletion-job_-_Details_for_Query_3](https://user-images.githubusercontent.com/5961173/177842058-574c871d-9f45-43b1-81b1-77f94bf9ac89.jpg)
   
   
   Iceberg Version: 0.13.0 (did not turn on merge-on-read for this yet)
   
   Spark Version: 3.2.0 (too many shuffle data) vs 3.1.1(works as expected with DynamicFileFilter)
   
   Appreciate your help!
   CC @szehon-ho @rdblue @aokolnychyi @wypoon 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #5224: Missing DynamicFileFilter when use iceberg in Spark 3.2

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #5224:
URL: https://github.com/apache/iceberg/issues/5224#issuecomment-1178051840

   We are using another approach for dynamic filtering in 3.2 that does not involve `DynamicFileFilter`.
   See `RowLevelCommandDynamicPruning` for more details.
   Can you share `EXPLAIN EXTENDED` output? Is dynamic partition pruning enabled in Spark? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly closed issue #5224: Missing DynamicFileFilter when use iceberg in Spark 3.2

Posted by GitBox <gi...@apache.org>.
dramaticlly closed issue #5224: Missing DynamicFileFilter when use iceberg in Spark 3.2
URL: https://github.com/apache/iceberg/issues/5224


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on issue #5224: Missing DynamicFileFilter when use iceberg in Spark 3.2

Posted by GitBox <gi...@apache.org>.
dramaticlly commented on issue #5224:
URL: https://github.com/apache/iceberg/issues/5224#issuecomment-1178418612

   Thank you @aokolnychyi for jumping in quickly, here's detailed explain output I found for spark 3.2 
   
   so for this particular delete query we are looking at 
   ```sql
   DELETE FROM privacy_dev.ussr_interaction_f_test_spark32 tgt
   WHERE tgt.request_dateint >= '20220320' AND tgt.request_dateint <= '20220321'
   AND upper(tgt.instance_id) IN (SELECT * FROM trigger_speech_ids)
   ```
   
   - the table `privacy_dev.ussr_interaction_f_test_spark32` is alias to `tgt` and partitioned by column `request_dateint`
   - the secondary table `trigger_speech_ids ` in sub query is not partitioned and it only have single column 
   
   I do observe some `dynamicpruning` filter is used but I dont see the one specifically mentioned by you.
   
   I also double checked `spark.sql.optimizer.dynamicPartitionPruning.enabled` is not explicit set in our application, default enabled in OSS and our spark distribution, so I assume it's turned on. I also tried to reproduce with explicit set this spark conf to be enabled, but still generate TB level of shuffle write, so might need second pair of eye here
   
   I redacted some of irrelevant columns here below, hope it helps!
   
   ---
   
   ## == Parsed Logical Plan ==
   ```
   'DeleteFromTable ((('tgt.request_dateint >= 20220320) AND ('tgt.request_dateint <= 20220321)) AND 'upper('tgt.instance_id) IN (list#267 []))
   :  +- 'Project [*]
   :     +- 'UnresolvedRelation [trigger_speech_ids], [], false
   +- 'SubqueryAlias tgt
      +- 'UnresolvedRelation [privacy_dev, ussr_interaction_f_test_spark32], [], false
   ```
   
   ## == Analyzed Logical Plan ==
   ```
   DeleteFromTable (((request_dateint#670 >= cast(20220320 as int)) AND (request_dateint#670 <= cast(20220321 as int))) AND upper(instance_id#682) IN (list#267 []))
   :  +- Project [speech_id#258]
   :     +- SubqueryAlias trigger_speech_ids
   :        +- View (`trigger_speech_ids`, [speech_id#258])
   :           +- Deduplicate [speech_id#258]
   :              +- Project [speech_id#258]
   :                 +- Filter ((date#741 >= 20220605) AND (date#741 <= 20220705))
   :                    +- Project [upper(speech_id)#252 AS speech_id#258, date#741]
   :                       +- Project [upper(speech_id#739) AS upper(speech_id)#252, date#741]
   :                          +- RelationV2[id#738, speech_id#739, ] spark_catalog.privacy_management.deletion_trigger_prod
   :- RelationV2[request_dateint#670, instance_id#682, ] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
   +- ReplaceData RelationV2[request_dateint#670, instance_id#682, ] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
      +- Filter NOT ((((request_dateint#670 >= cast(20220320 as int)) AND (request_dateint#670 <= cast(20220321 as int))) AND upper(instance_id#682) IN (list#267 [])) <=> true)
         :  +- Project [speech_id#258]
         :     +- SubqueryAlias trigger_speech_ids
         :        +- View (`trigger_speech_ids`, [speech_id#258])
         :           +- Deduplicate [speech_id#258]
         :              +- Project [speech_id#258]
         :                 +- Filter ((date#239 >= 20220605) AND (date#239 <= 20220705))
         :                    +- Project [upper(speech_id)#252 AS speech_id#258, date#239]
         :                       +- Project [upper(speech_id#237) AS upper(speech_id)#252, date#239]
         :                          +- RelationV2[id#236, speech_id#237, ] spark_catalog.privacy_management.deletion_trigger_prod
         +- RelationV2[request_dateint#670, instance_id#682, _file#736, _pos#737L] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
   ```
   
   ## == Optimized Logical Plan ==
   ```
   ReplaceData RelationV2[request_dateint#670, instance_id#682, ] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32, IcebergWrite(table=spark_catalog.privacy_dev.ussr_interaction_f_test_spark32, format=PARQUET)
   +- Project [request_dateint#670, instance_id#682, ]
      +- Sort [request_dateint#670 ASC NULLS FIRST, _file#736 ASC NULLS FIRST, _pos#737L ASC NULLS FIRST], false
         +- RepartitionByExpression [_file#736], 960
            +- Project [request_dateint#670, instance_id#682, _file#736, _pos#737L]
               +- Filter NOT ((((request_dateint#670 >= 20220320) AND (request_dateint#670 <= 20220321)) AND exists#967) <=> true)
                  +- Join ExistenceJoin(exists#967), (upper(instance_id#682) = speech_id#258)
                     :- Filter dynamicpruning#1034 [_file#736]
                     :  :  +- Project [_file#1032]
                     :  :     +- Join LeftSemi, (upper(instance_id#980) = speech_id#258)
                     :  :        :- Project [instance_id#980, _file#1032]
                     :  :        :  +- Filter ((isnotnull(request_dateint#968) AND (request_dateint#968 >= 20220320)) AND (request_dateint#968 <= 20220321))
                     :  :        :     +- RelationV2[request_dateint#968,  instance_id#980, ] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
                     :  :        +- InMemoryRelation [speech_id#258], StorageLevel(disk, memory, deserialized, 1 replicas)
                     :  :              +- *(2) HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
                     :  :                 +- Exchange hashpartitioning(speech_id#258, 960), ENSURE_REQUIREMENTS, [id=#89]
                     :  :                    +- *(1) HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
                     :  :                       +- *(1) Project [upper(speech_id#237) AS speech_id#258]
                     :  :                          +- *(1) Filter ((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <= 20220705))
                     :  :                             +- BatchScan[speech_id#237, date#239] spark_catalog.privacy_management.deletion_trigger_prod [filters=date IS NOT NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
                     :  +- RelationV2[request_dateint#670, instance_id#682, _file#736, _pos#737L] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
                     +- InMemoryRelation [speech_id#258], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- *(2) HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
                              +- Exchange hashpartitioning(speech_id#258, 960), ENSURE_REQUIREMENTS, [id=#89]
                                 +- *(1) HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
                                    +- *(1) Project [upper(speech_id#237) AS speech_id#258]
                                       +- *(1) Filter ((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <= 20220705))
                                          +- BatchScan[speech_id#237, date#239] spark_catalog.privacy_management.deletion_trigger_prod [filters=date IS NOT NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
   ```
   
   ## == Physical Plan ==
   ```
   ReplaceData IcebergWrite(table=spark_catalog.privacy_dev.ussr_interaction_f_test_spark32, format=PARQUET)
   +- *(2) Project [request_dateint#670, instance_id#682, ]
      +- *(2) Sort [request_dateint#670 ASC NULLS FIRST, _file#736 ASC NULLS FIRST, _pos#737L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(_file#736, 960), REPARTITION_BY_NUM, [id=#469]
            +- *(1) Project [request_dateint#670, instance_id#682, _file#736, _pos#737L]
               +- *(1) Filter NOT ((((request_dateint#670 >= 20220320) AND (request_dateint#670 <= 20220321)) AND exists#967) <=> true)
                  +- *(1) BroadcastHashJoin [upper(instance_id#682)], [speech_id#258], ExistenceJoin(exists#967), BuildRight, false
                     :- *(1) Project [request_dateint#670, instance_id#682, _file#736, _pos#737L]
                     :  +- BatchScan[request_dateint#670, instance_id#682, _file#736, _pos#737L] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32 [filters=request_dateint >= 20220320, request_dateint <= 20220321] RuntimeFilters: [dynamicpruningexpression(_file#736 IN subquery#1051)]
                     :        +- Subquery subquery#1051, [id=#408]
                     :           +- *(2) HashAggregate(keys=[_file#1032#1050], functions=[], output=[_file#1032#1050])
                     :              +- Exchange hashpartitioning(_file#1032#1050, 960), ENSURE_REQUIREMENTS, [id=#404]
                     :                 +- *(1) HashAggregate(keys=[_file#1032 AS _file#1032#1050], functions=[], output=[_file#1032#1050])
                     :                    +- *(1) Project [_file#1032]
                     :                       +- *(1) BroadcastHashJoin [upper(instance_id#980)], [speech_id#258], LeftSemi, BuildRight, false
                     :                          :- *(1) Project [instance_id#980, _file#1032]
                     :                          :  +- *(1) Filter ((isnotnull(request_dateint#968) AND (request_dateint#968 >= 20220320)) AND (request_dateint#968 <= 20220321))
                     :                          :     +- BatchScan[request_dateint#968,  instance_id#980, ] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32 [filters=request_dateint >= 20220320, request_dateint <= 20220321] RuntimeFilters: []
                     :                          +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#389]
                     :                             +- InMemoryTableScan [speech_id#258]
                     :                                   +- InMemoryRelation [speech_id#258], StorageLevel(disk, memory, deserialized, 1 replicas)
                     :                                         +- *(2) HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
                     :                                            +- Exchange hashpartitioning(speech_id#258, 960), ENSURE_REQUIREMENTS, [id=#89]
                     :                                               +- *(1) HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
                     :                                                  +- *(1) Project [upper(speech_id#237) AS speech_id#258]
                     :                                                     +- *(1) Filter ((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <= 20220705))
                     :                                                        +- BatchScan[speech_id#237, date#239] spark_catalog.privacy_management.deletion_trigger_prod [filters=date IS NOT NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
                     +- ReusedExchange [speech_id#258], BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#389]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #5224: Missing DynamicFileFilter when use iceberg in Spark 3.2

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #5224:
URL: https://github.com/apache/iceberg/issues/5224#issuecomment-1179215916

   I see dynamic filtering working correctly:
   
   ```
   RuntimeFilters: [dynamicpruningexpression(_file#736 IN subquery#1051)]
   ```
   
   The only non-broadcast shuffle happens later to cluster records by `_file`.
   You can disable it by setting `write.delete.distribution-mode` to none as you are using broadcast joins otherwise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on issue #5224: Missing DynamicFileFilter when use iceberg in Spark 3.2

Posted by GitBox <gi...@apache.org>.
dramaticlly commented on issue #5224:
URL: https://github.com/apache/iceberg/issues/5224#issuecomment-1179251349

   Thank you @aokolnychyi for all of your help, after set `write.delete.distribution-mode` to none instead of default `hash`, our job on spark3.2 /iceberg 13 work as expected, really appreciate your insights!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org