You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/28 22:06:13 UTC

[GitHub] [iceberg] stevenzwu opened a new issue, #6303: Flink: add more sink shuffling support

stevenzwu opened a new issue, #6303:
URL: https://github.com/apache/iceberg/issues/6303

   ### Feature Request / Improvement
   
   Today, Flink Iceberg sink only supports simple keyBy hash distribution on partition columns. In practice, keyBy shuffle on partition values doesn't work very well.
   
   We can make the following shuffling enhancements in Flink streaming writer. More details can be found in the [design doc](https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo). This is an uber issue for tracking purpose. Here are the rough phases.
   
   1. [hash distribution] custom partitioner on bucket values. [PR 4228](https://github.com/apache/iceberg/pull/4228) demonstrated that keyBy on low-cardinality partitioning buckets resulted in skewed traffic distribution. Flink sink can add a custom partitioner that directly map the bucket value (integer) to the downstream writer tasks (integer) in round-robin fashion (mod). This is a relatively simple case.
   
   This is a case when `write.distribution-mode=hash` and there is a bucketing partition column. Other partition columns (like hourly partition) will be ignored regarding shuffling. The assumption is that bucketing column is where we want to distribute/cluster the rows.
   
   2. [hash distribution] bin packing based on traffic distribution statistics. This works well for skewed data on partition columns (like event time). This requires calculating traffic distribution statistics across partition columns and use the statistics to guide shuffling decision.
   
   This is a case when `write.distribution-mode=hash` and there is NO bucketing partition column. 
   
   3. [range distribution] range partition based on traffic distribution statistics. It is a variant of 2 above. This works well for "sorting" non-partition columns (e.g. country code, event type). It can improve data clustering by creating data files with narrow value ranges. Note that Flink streaming writer probably won't sort rows within a file, as that would be very expensive (not impossible). Even without rows sorted within a file, the improved data clustering can result in effective file pruning. We just can't get the additional benefits of row group level skipping (for Parquet) with rows sorted within a file.
   
   This is a case when `write.distribution-mode=range` and `SortOrder` is defined for non-partition columns. partition columns will be ignored for range shuffling as the assumption is that non-partition sort columns are what matter here.
   
   4. [high cardinality columns] 2 and 3 above are mostly for low-cardinality columns (e.g. unique values in hundreds), where a simple dictionary of count per value can be used to track traffic distribution statistics. For high-cardinality column (like device or user id), we would need to use probabilistic data sketches algorithm to calculate traffic distribution.
   
   ### Query engine
   
   Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on issue #6303: Flink: add more sink shuffling support

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on issue #6303:
URL: https://github.com/apache/iceberg/issues/6303#issuecomment-1514965996

   hi, @stevenzwu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #6303: Flink: add more sink shuffling support

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on issue #6303:
URL: https://github.com/apache/iceberg/issues/6303#issuecomment-1329866356

   Created a new project as this is a relatively large scope overall: https://github.com/apache/iceberg/projects/27


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on issue #6303: Flink: add more sink shuffling support

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on issue #6303:
URL: https://github.com/apache/iceberg/issues/6303#issuecomment-1514984945

   Hi,I am very interested in the fast project. At present, we have a serious tilt problem in the process of using iceberg to write. I have been paying attention to the progress of this module. Now I want to put forward some of my ideas.
   
   I took a close look at https://github.com/apache/iceberg/pull/6382 and https://github.com/apache/iceberg/pull/7269
   
   I think there are some problems. I completed the following simple implementation based on these two PRs on my own branch, but the throughput of the program has dropped significantly, almost reaching the point of being unusable, so I think, should we Stop and think about whether this solution is suitable
   
   From my observation, the problem lies in the DataStatisticsOperator. When output.collect is called here, Flink’s serialization will be forced to be triggered, but DataStatisticsOrRecord will degenerate into kryo mode during serialization, resulting in a performance drop of more than 4 times 
   <img width="1433" alt="image" src="https://user-images.githubusercontent.com/40817998/233132909-209f9b69-1197-4088-8572-d30e2bbe7ea4.png">
   Spent too many computing resources in serialization, So I think we may need to seriously consider the feasibility of this option
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on issue #6303: Flink: add more sink shuffling support

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on issue #6303:
URL: https://github.com/apache/iceberg/issues/6303#issuecomment-1514980946

   hi,I am very interested in the fast project. At present, we have a serious tilt problem in the process of using iceberg to write. I have been paying attention to the progress of this module. Now I want to put forward some of my ideas.
   
   
   I took a close look at #6382 and #7269
   
   I think there are some problems. I completed the following simple implementation based on these two PRs on my own branch, but the throughput of the program has dropped significantly, almost reaching the point of being unusable, so I think, should we Stop and think about whether this solution is suitable
   
   From my observation, the problem lies in the DataStatisticsOperator. When output.collect is called here, Flink’s serialization will be forced to be triggered, but DataStatisticsOrRecord will degenerate into kryo mode during serialization, resulting in a performance drop of more than 4 times


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on issue #6303: Flink: add more sink shuffling support

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on issue #6303:
URL: https://github.com/apache/iceberg/issues/6303#issuecomment-1453092256

   Great design! I think we can continue adding new issues so that guys can choose the tasks they want to work on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on issue #6303: Flink: add more sink shuffling support

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on issue #6303:
URL: https://github.com/apache/iceberg/issues/6303#issuecomment-1514968044

   hi,我对快项目非常感兴趣,目前我们在使用iceberg写入过程中也出现了比较严重的倾斜问题,我一直在关注这个模块的进展,现在我想提出我的一些想法


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org