You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "bowenliang123 (via GitHub)" <gi...@apache.org> on 2023/06/28 10:13:30 UTC

[GitHub] [iceberg] bowenliang123 opened a new pull request, #7932: [WIP] Use Reblance instead of Repartition for distribution

bowenliang123 opened a new pull request, #7932:
URL: https://github.com/apache/iceberg/pull/7932

   - Use Reblance instead of Repartition for distribution in Spark 3.2 and 3.3, to avoid small partitioned files cased with AQE decided partition numbers.
   
   Before: 
   Having `REPARTITION_BY_NUM` in `+- Exchange hashpartitioning(lab_numr#270, busi_date#271, 200), REPARTITION_BY_NUM,`
   
   ```
   +----------------------------------------------------+
   |                        plan                        |
   +----------------------------------------------------+
   | == Parsed Logical Plan ==
   'InsertIntoStatement 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice_bowen], [], false, false, false
   +- 'Project [*]
      +- 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice], [], false
   
   == Analyzed Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false
   +- Project [obj_id#42, lab_val#43, lab_numr#44, busi_date#45]
      +- SubqueryAlias spark_catalog.gfpersonas_platform.t_ptr_label_ice
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Optimized Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
   +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false
      +- RebalancePartitions [lab_numr#44, busi_date#45]
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Physical Plan ==
   AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
   +- AdaptiveSparkPlan isFinalPlan=false
      +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
            +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, groupedBy=] RuntimeFilters: []
    |
   +----------------------------------------------------+
   ```
   
   After: 
   Having `REBALANCE_PARTITIONS_BY_COL` in `Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, `
   
   ```
   +----------------------------------------------------+
   |                        plan                        |
   +----------------------------------------------------+
   | == Parsed Logical Plan ==
   'InsertIntoStatement 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice_bowen], [], false, false, false
   +- 'Project [*]
      +- 'UnresolvedRelation [gfpersonas_platform, t_ptr_label_ice], [], false
   
   == Analyzed Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false
   +- Project [obj_id#42, lab_val#43, lab_numr#44, busi_date#45]
      +- SubqueryAlias spark_catalog.gfpersonas_platform.t_ptr_label_ice
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Optimized Logical Plan ==
   AppendData RelationV2[obj_id#46, lab_val#47, lab_numr#48, busi_date#49] spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, false, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
   +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false
      +- RebalancePartitions [lab_numr#44, busi_date#45]
         +- RelationV2[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice
   
   == Physical Plan ==
   AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
   +- AdaptiveSparkPlan isFinalPlan=false
      +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
            +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, groupedBy=] RuntimeFilters: []
    |
   +----------------------------------------------------+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] namrathamyske commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "namrathamyske (via GitHub)" <gi...@apache.org>.
namrathamyske commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1615265754

   @bowenliang123 Looks like REBALANCE_PARTITIONS_BY_COL does not have range partitioner support  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1614218480

   > Before:
   Having REPARTITION_BY_NUM in +- Exchange hashpartitioning(lab_numr#270, busi_date#271, 200), REPARTITION_BY_NUM
   
   It seems it is `REBALANCE_PARTITIONS_BY_COL ` as well from the plan.
   
   ```
   == Physical Plan ==
   AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
   +- AdaptiveSparkPlan isFinalPlan=false
      +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
            +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, groupedBy=] RuntimeFilters: []
    |
   +----------------------------------------------------+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] bowenliang123 commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "bowenliang123 (via GitHub)" <gi...@apache.org>.
bowenliang123 commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1631680989

   I do not have a clue about how to fix the failures in GA tests, and where and why they fail. May need some help in this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] bowenliang123 commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "bowenliang123 (via GitHub)" <gi...@apache.org>.
bowenliang123 commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1621211436

   > @bowenliang123 @ConeyLiu i understand REBALANCE_PARTITIONS_BY_COL this adds a adaptive coalesce(AQE) which just coalesces the partitions local to executor( hence reducing number of files written) . Is this effective if the partitions are spread across different workers since the partitions wont be local anymore( for coalesce to work) ?
   
   <img width="427" alt="image" src="https://github.com/apache/iceberg/assets/1935105/04947fd7-425c-4253-a20e-eaddf46afdbe">
   Since RebalancePartitions is introduced, a shuffle read stage was introduced. So I think it works for partitions across worker nodes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] namrathamyske commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "namrathamyske (via GitHub)" <gi...@apache.org>.
namrathamyske commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1629672983

   @bowenliang123 Can we merge this to master ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] bowenliang123 commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "bowenliang123 (via GitHub)" <gi...@apache.org>.
bowenliang123 commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1617780801

   > @bowenliang123 Looks like REBALANCE_PARTITIONS_BY_COL does not have range partitioner support
   
   Yes, you are right. RebalancePartitions only supports RoundRobinPartitioning and HashPartitioning.
   
   I initialled this PR as a workaround in my case. It might not be perfect for satisfying ranger support and semantics of `Distribution`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] bowenliang123 commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "bowenliang123 (via GitHub)" <gi...@apache.org>.
bowenliang123 commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1631675253

   > @bowenliang123 Can we merge this to master by having a flag called "strictDistributionRequired" Similar to https://github.com/apache/spark/blob/453300b418bc03511ad9167bbaad49e0f1f1c090/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala#L63 for rebalance to be applied?
   
   Yes, I have noticed this changes in Spark 3.4. And backporting them to 3.3 is a considerable approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] namrathamyske commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "namrathamyske (via GitHub)" <gi...@apache.org>.
namrathamyske commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1620976658

   @bowenliang123 i understands this adds a adaptive coalesce(AQE) which just coalesces the partitions local to executor( hence reducing number of files written) . Is this effective if the partitions are spread across different workers since the partitions wont be local anymore( for coalesce to work) ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] bowenliang123 commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "bowenliang123 (via GitHub)" <gi...@apache.org>.
bowenliang123 commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1617770515

   > > Before:
   > > Having REPARTITION_BY_NUM in +- Exchange hashpartitioning(lab_numr#270, busi_date#271, 200), REPARTITION_BY_NUM
   > 
   > It seems it is `REBALANCE_PARTITIONS_BY_COL ` as well from the plan.
   > 
   > ```
   > == Physical Plan ==
   > AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3367/659079940@629fd732, IcebergWrite(table=spark_catalog.gfpersonas_platform.t_ptr_label_ice_bowen, format=PARQUET)
   > +- AdaptiveSparkPlan isFinalPlan=false
   >    +- Sort [lab_numr#44 ASC NULLS FIRST, busi_date#45 ASC NULLS FIRST], false, 0
   >       +- Exchange hashpartitioning(lab_numr#44, busi_date#45, 200), REBALANCE_PARTITIONS_BY_COL, [plan_id=49]
   >          +- BatchScan[obj_id#42, lab_val#43, lab_numr#44, busi_date#45] spark_catalog.gfpersonas_platform.t_ptr_label_ice (branch=null) [filters=, groupedBy=] RuntimeFilters: []
   >  |
   > +----------------------------------------------------+
   > ```
   
   Sorry, that I pasted a wrong one as a `before` plan. Updated with a new correct one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] namrathamyske commented on pull request #7932: Spark 3.2 and 3.3: Use Reblance instead of Repartition for distribution in SparkWrite

Posted by "namrathamyske (via GitHub)" <gi...@apache.org>.
namrathamyske commented on PR #7932:
URL: https://github.com/apache/iceberg/pull/7932#issuecomment-1621236434

   Correct, but as we see first step of exchange there are 20 partitions which is parameter controlled by config shuffle partitions number. Next step is AQEshuffleRead which coalesces 20 partitions to 1 partitions. In the above plan maybe all 20 partitions are processed by same executor, so all of them coalesced to  one partitions. But if 20 partitions were spread across more executors, then number of coalesced partitions might increase as coalesce only happens locally per executor. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org