You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "huyuanfeng2018 (via GitHub)" <gi...@apache.org> on 2023/04/21 06:08:57 UTC

[GitHub] [iceberg] huyuanfeng2018 opened a new issue, #7393: The serialization problem caused by Flink shuffling design

huyuanfeng2018 opened a new issue, #7393:
URL: https://github.com/apache/iceberg/issues/7393

   ### Feature Request / Improvement
   
   this issue from #6303,I open a new issue to discuss this issue
   
   I am very interested in the project. At present, we have a serious tilt problem in the process of using iceberg to write. I have been paying attention to the progress of this module. Now I want to put forward some of my ideas.
   
   I took a close look at https://github.com/apache/iceberg/pull/6382 and https://github.com/apache/iceberg/pull/7269
   
   I think there are some problems. I completed the following simple implementation based on these two PRs on my own branch, but the throughput of the program has dropped significantly, almost reaching the point of being unusable, so I think, should we Stop and think about whether this solution is suitable
   
   From my observation, the problem lies in the DataStatisticsOperator. When output.collect is called here, Flink’s serialization will be forced to be triggered, but DataStatisticsOrRecord will degenerate into kryo mode during serialization, resulting in a performance drop of more than 4 times
   <img width="1433" alt="image" src="https://user-images.githubusercontent.com/40817998/233132909-209f9b69-1197-4088-8572-d30e2bbe7ea4.png">
   Spent too many computing resources in serialization, So I think we may need to seriously consider the feasibility of this Proposal
   
   @stevenzwu @hililiwei
   
   ### Query engine
   
   Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1521090778

   Add a throughput without enumeration partition,with the same resources
   
   <img width="1003" alt="image" src="https://user-images.githubusercontent.com/40817998/234163440-911fc7ad-f886-4dd6-95b8-8cc8adc5c2eb.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] The serialization problem caused by Flink shuffling design [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1826152438

   This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1521105434

   > Why doesn't the DataStatisticsOrRecord object directly implement Flink Rowdata? Because the generic type here has been determined to be Rowdata when using Flink to write
   
   This is a good point. we probably should change it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1519324779

   @huyuanfeng2018 thx for the experiment. `DataStatisticsOrRecord` is the only way to pass statistics to the custom partitioner. Agree with you that Kryo serialization will be slower. we will need to provide a type serializer for the type, which we had in our internal PoC impl and testing.
   
   > resulting in a performance drop of more than 4 times
   
   Can you elaborate on the observation of 4x slowdown? what are the A/B test setup?
   
   In the benchmark with the internal PoC impl, we observed 60% more CPU overhead for a simple job reading from Kafka and writing to Iceberg with event time partitioned table. As expected, bulk of the overhead comes from serdes and network I/O.
   
   cc @yegangy0718 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xccui commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "xccui (via GitHub)" <gi...@apache.org>.
xccui commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1529261839

   > @xccui I thought we can't make `DataStatisticsOrRecord` a true Pojo, because the `RowData` is not a POJO. we will provide `TypeSerializer` implementations for `DataStatistics` and `DataStatisticsOrRecord `
   
   Ah, yes, it's not that trivial. For `RowData`, `org.apache.flink.table.runtime.typeutils.InternalTypeInfo` can help, but overall, customized `TypeSerializers` would be nice to have 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xccui commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "xccui (via GitHub)" <gi...@apache.org>.
xccui commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1527001510

   Came across this issue. As a workaround, you can try defining your own `com.esotericsoftware.kryo.Serializer` for `DataStatisticsOrRecord` and register it via `env.registerTypeWithKryoSerializer()`.
   
   @stevenzwu I feel that at least we can make `DataStatisticsOrRecord` a POJO, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1519289669

   @huyuanfeng2018 Thank you for bringing this up. I don't have any more data to share at the moment.  I'll take a quick test.
   
   @stevenzwu @yegangy0718 seems to have more detailed data, what do you guys think about this?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1529236266

   @xccui I thought we can't make `DataStatisticsOrRecord` a true Pojo, because the `RowData` is not a POJO. we will provide `TypeSerializer` implementations for `DataStatistics` and `DataStatisticsOrRecord `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1519739497

   HI, @stevenzwu @stevenzwu @hililiwei 
   Thank you for your reply!
   My scenario is that the server logs are written to iceberg in real time, and the peak period of real-time data volume is about 1.0M/s,
   <img width="456" alt="image" src="https://user-images.githubusercontent.com/40817998/233956036-25ef3566-3e6a-4d11-b8df-b7b7b171177a.png">
   At present, according to the day, hour, and an enumeration partition field, we have about 70 enumeration partitions, of which two enumerations account for more than 70% of the total, so the current iceberg write mode certainly cannot meet our requirements. Requirements, currently we have 200 parallel writes online, shuffling by defining the ratio of the amount of data under each enumeration to the total amount of data by ourselves, specifying the ratio like this
   
   'distribution-balance-column-ratio' = 'sysdk_android:0.0005,_wap:0.0003,android_tv:0.003.......'
   
   However, the proportion of each enumeration will change in certain time periods, so there will still be a tilt in certain time periods, resulting in a backlog of my tasks.
   
   So I tried to achieve automatic balancing, but under the same cluster configuration, my processing efficiency was 4 times slower, about 200~300k/s, among which I have put the flame graph on it, and most of the processing is I am doing the serialization operation of the statistics record. I think if you re-implement the serialization interface of the record, can you give me a sample and I can test it in my scenario to see how much improvement there is. In addition, if necessary, I will I can help as much as I can


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1520391187

   > I think if you re-implement the serialization interface of the record, can you give me a sample and I can test it in my scenario to see how much improvement there is.
   
   so the 4x slowdown is measured by throughput (not CPU overhead). Unless you are seeing 4x CPU overhead, this will likely not solve your problem.
   
   We haven't finished the MVP version with a custom range partitioner. How did you actually shuffle the records leveraging the data statistics? Were the traffic relatively evenly distributed to writer tasks? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] The serialization problem caused by Flink shuffling design [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1783948782

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1521084658

   1. Regarding the 4x slowdown, this is indeed a four-fold decrease in throughput. As a user, I am more concerned about the throughput under the same resources, so this does not mean that the CPU consumption is increased 4x.
   
   2. Regarding the custom range partitioner, I implemented a custom range partitioner myself. As described in my previous reply, we will specify the proportion of each enumeration partition to the total data volume before the task starts (distribution-balance-column- ratio), and then use the custom range partitioner to calculate which streamWrite subtasks the data of each enumeration type is distributed to. According to #6382 and #7269, I then realized the dynamic adjustment of the total data of each enumeration partition based on each checkpoint statistics The proportion of volume(distribution-balance-column- ratio), so as to achieve the purpose of dynamic allocation, from the metric on the Flink ui, my data shuffling is very uniform, from the monitoring point of view, the cpu occupancy rate of all my tm is 100%, the following two pictures These are some of my monitoring data at this time
   <img width="623" alt="image" src="https://user-images.githubusercontent.com/40817998/234160124-aeb88a2d-953c-45be-a299-fc5fb4189c78.png">
   
   <img width="1093" alt="image" src="https://user-images.githubusercontent.com/40817998/234160387-baeaddcd-1697-4b5d-9d0c-22ba8a915257.png">
   3. In fact, when I doubled the parallelism, I found that my processing efficiency also doubled like this, so it can be determined with a high probability that it is the bottleneck of the cpu rather than the network io:
   <img width="1364" alt="image" src="https://user-images.githubusercontent.com/40817998/234161033-47b26d7d-81a0-4205-948c-f0544d32c203.png">
   
   4. In fact, there is one thing I have doubts about. Why doesn't the DataStatisticsOrRecord object directly implement Flink Rowdata? Because the generic type here has been determined to be Rowdata when using Flink to write
   ```
   public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yegangy0718 commented on issue #7393: The serialization problem caused by Flink shuffling design

Posted by "yegangy0718 (via GitHub)" <gi...@apache.org>.
yegangy0718 commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1519392273

   Hi @huyuanfeng2018  Thanks for showing interest in the project. 
   
   We do have plan to add custom serializer for `DataStatisticsOrRecord ` as  @stevenzwu  commented at https://github.com/apache/iceberg/pull/7269#discussion_r1157718810.  
   
   We have done perf test with the internal PoC impl. The result was published at https://www.slideshare.net/FlinkForward/tame-the-small-files-problem-and-optimize-data-layout-for-streaming-ingestion-to-iceberg from slide 44 to the end. We observed the CPU usage increased from 35% to 57% for the simplest streaming job(consumes from Kafka and writes to Iceberg) after applying shuffling. It's expected since we trade more CPU usage for better file size and data clustering. 
   
   We may need more information for the test cases you run like the Flink DAGs structure, the data distribution, and so on to analyze the perf impact that happens to you. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] The serialization problem caused by Flink shuffling design [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #7393: The serialization problem caused by Flink shuffling  design
URL: https://github.com/apache/iceberg/issues/7393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org