You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/26 03:44:58 UTC

[GitHub] [iceberg] hameizi opened a new pull request #2867: Flink: Auto compact file

hameizi opened a new pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867


   This add one feature that flink write iceberg auto compact small files. And add config "write.auto-compact-files"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-886470356


   > This add one feature that flink write iceberg auto compact small files.
   
   Possibly I'm missing something, but I don't see how file size is taken into account here. This appears to be compacting regardless. In some cases, the cost of opening and rewriting provides less value than leaving the data as is. Can we account for this like we do in some other places?
   
   This would be a good topic to consider discussing in the mentioned GitHub issue :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-886492617


   > Hi @hameizi! Thank you for your contribution and interest in Iceberg!
   > 
   > This is a non-trivial functionality addition. Would it be possible for you to create a GitHub issue for us to track and discuss this? Github issues are how we normally track new work etc, and I think given that this functionality has not been discussed etc, it would benefit from going through the standard process of having an issue.
   > 
   > By no means do you need to close the PR, but it would help to follow the normal workflow (and also provides the benefit that people can search the issues to see the discussion). 🙂
   
   I add one issue https://github.com/apache/iceberg/issues/2869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-892178217


   I share the same concern as @rdblue. It seems to me that this impl basically have a single committer task/thread that reads all all rows from a CombinedScanTask (files batched by BaseRewriteDataFilesAction) and writes them out. How is different to just configure the StreamFileWriter with parallelism of 1?
   
   if we make it a truly parallel rewrite/compaction action, I am a little concerned about the complexity we are adding to the Flink streaming ingestion path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-886470356


   > This add one feature that flink write iceberg auto compact small files.
   
   Possibly I'm missing something, but I don't see any accounting for files that might be already near, or close to the optimal size. It's late and my eyes may deceive me, but this appears to be compacting all files to be ideally the target file size bytes, regardless of their existing size etc. In some cases, the cost of opening and rewriting provides less value than leaving the data as is. Can we account for this like we do in some other places? Or am I just missing the fact that that functionality is hidden elsewhere?
   
   This would be a good topic to consider discussing in the mentioned GitHub issue :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-892517896


   > it is likely the compactor can't keep up with the workload.
   
   @stevenzwu In theory it's will occur but the impl in this PR is similar with the custom policy in hive-commiter of flink. And user offen use custom policy to compact hive files so am i. But in my most scene there are no problem, and i am working for stress testing for this PR and not found cleraly errors up to now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-886502378


   > > This add one feature that flink write iceberg auto compact small files.
   > 
   > Possibly I'm missing something, but I don't see any accounting for files that might be already near, or close to the optimal size. It's late and my eyes may deceive me, but this appears to be compacting all files to be ideally the target file size bytes, regardless of their existing size etc. In some cases, the cost of opening and rewriting provides less value than leaving the data as is. Can we account for this like we do in some other places? Or am I just missing the fact that that functionality is hidden elsewhere?
   > 
   > This would be a good topic to consider discussing in the mentioned GitHub issue :)
   
   It will compact files who is the result by partition filter, so it will compact all files when there  is  no partitions. But in my work sence, there is no problem because we compact every transction that will generate little files. So compact files in every transcation is quickly and small cost and the time compact files will not more than transaction time too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi edited a comment on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi edited a comment on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-918739609


   > I think the parallel commit proposal that @rdblue proposed could work
   
   @jackye1995 In this PR the rewriteAction of flink is parallel, it will not make data deal slow down. Because when the function snapshot success flink will continue deal data but not wait the result of notifyCheckpointComplete.
   
   > I wonder what is the initial drive behind this implementation.
   
   Auto compact file every checkpoint in flink will solve several question. 
   1. It will make query iceberg table fastly every time, because in our sence we find query table slowly although we have schedule  compact file every day, but it is not enough.
   2. It will slove the bug of there is duplicate rows in iceberg primary table when we compact file https://github.com/apache/iceberg/issues/2308 . Because we strict commit one snashot and then compact file in order, so we will not cause there is one more snapshot is commit when we are compacting file. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-890557948


   @hameizi, could you update the PR description with details about this feature? Auto-compaction is not very specific so I'd like to hear how you implemented it and what that means.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi edited a comment on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi edited a comment on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-892309582


   > How is different to just configure the StreamFileWriter with parallelism of 1?
   
   @stevenzwu 
   I think there is twodifferent point: 
   1.As i said before the compaction will not block handle data in flink job because the data channel will be not block just when snapshot function completed so for handle data that compaction is in parallel in this PR. So more parallelism of StreamFileWriter will benefit fast handle data in running time, but compaction just work in the time of notifyCheckpointComplete that not affect handle data.
   2. In the scene that produce little data in every checkpoint will generate small files, but if we compact files in every checkpoint will all the small files for the table as result.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-893145473


   @hameizi can you try some setup like this for the stress testing with your auto compaction change here?
   - a simple job just read data from Kafka and writes to Iceberg (e.g. partitioned by ingestion/processing time). 
   - writer: parallelism of 10 (higher can be better)
   - I would expect each of the 10 writer tasks probably can write data in the order of MBs/sec. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-898161450


   > > @hameizi can you try some setup like this for the stress testing with your auto compaction change here?
   > > 
   > > * a simple job just read data from Kafka and writes to Iceberg (e.g. partitioned by ingestion/processing time).
   > > * writer: parallelism of 10 (higher can be better)
   > > * I would expect each of the 10 writer tasks probably can write data in the order of MBs/sec.
   > 
   > I'm sorry that maybe my test can't up to the standard as you say. I can test one more later.
   
   @stevenzwu I test one case, parallelism of 10, 30000+/sec, partition by hour, 3GB memory of per taskmanager. It's no problem for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-892032453


   I have been make compaction in parallel formerly ,but i abort it because i think it will cause maybe be there is not just one compaction are executing because of current compaction overtime. In addition in this PR the compaction will not block handle data in flink job because the data channel will be not block just when snapshot function completed so for handle data compaction is in parallel in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-891125532


   @hameizi, could you please be more specific? How does this determine which files to rewrite? In what tasks are they rewritten? Does this introduce new operators? Are files rewritten before initial commit or afterward in a replace commit?
   
   There are a lot of details for a feature like this that need to be clear.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-886469034


   Hi @hameizi! Thank you for your contribution and interest in Iceberg!
   
   This is a non-trivial functionality addition. Would it be possible for you to create a GitHub issue for us to track and discuss this? Github issues are how we normally track new work etc, and I think given that this functionality has not been discussed etc, it would benefit from going through the standard process of having an issue.
   
   By no means do you need to close the PR, but it would help to follow the normal workflow (and also provides the benefit that people can search the issues to see the discussion). 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-893145473






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-892373055


   if the writers are parallel (like 100) and the compactor is single parallelism, it is likely the compactor can't keep up with the workload. Even though compaction is running asynchronously with snapshotState(), it will eventually back up/block the threads executing notifyCheckpointComplete().
   
   In the streaming ingestion path, here are a few things we can do or improve to mitigate the small files
   1. writer parallelism
   2. for partitioned tables, we can use DistributionMode to improve data clustering so that we can avoid that every writer task write to many partitions at the same time. Right now, Flink writer only has support for HASH mode. In the future, RANGE or BINPACKING might be more useful and general. They can also improve query performance with better data clustering.
   
   Even with above changes, Flink streaming ingestion can still generate small files. The parallel compactors and 2nd committer that Ryan mentioned might be able to keep up with the throughput. However, personally I would rather not over-complicate the streaming ingestion path and make it less stable. Let's get the data into long-term data storage (like Iceberg tables) first.  Other optimizations (like compaction or sorting) can happen in the background with scheduled (Spark) batch jobs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi edited a comment on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi edited a comment on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-898161450


   > > @hameizi can you try some setup like this for the stress testing with your auto compaction change here?
   > > 
   > > * a simple job just read data from Kafka and writes to Iceberg (e.g. partitioned by ingestion/processing time).
   > > * writer: parallelism of 10 (higher can be better)
   > > * I would expect each of the 10 writer tasks probably can write data in the order of MBs/sec.
   > 
   > I'm sorry that maybe my test can't up to the standard as you say. I can test one more later.
   
   @stevenzwu I test one case, parallelism of 10, 30000+ records/sec(max 60000+/sec), partition by hour, 3GB memory of per taskmanager. It's no problem for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-887985252


   @openinx Could you help review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-892517896






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-891978223


   From the description, it sounds like the rewrite happens in the committer task rather than in parallel. Is there a good way to make this happen in parallel instead?
   
   What we discussed elsewhere was doing a compaction by adding a new parallel stage and second committer after the initial committer. The current commit task would output committed `DataFile` instances after the commit succeeds. Then those would be sent to compaction writers using `keyBy` and the partition. Once a compacted data file is large enough, the compaction writer will emit it as a `DataFile` along with the `DataFile` instances that were compacted. Those would be collected by the compaction committer, which would commit a rewrite every checkpoint where there is at least one compacted file.
   
   I think that we should plan on having some parallelism here, or else this is not going to be a very useful feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-891459373


   > @hameizi, could you please be more specific? How does this determine which files to rewrite? In what tasks are they rewritten? Does this introduce new operators? Are files rewritten before initial commit or afterward in a replace commit?
   > 
   > There are a lot of details for a feature like this that need to be clear.
   
   @rdblue Hi, i have update the PR description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-892309582


   > How is different to just configure the StreamFileWriter with parallelism of 1?
   
   I think there is twodifferent point: 
   1.As i said before the compaction will not block handle data in flink job because the data channel will be not block just when snapshot function completed so for handle data that compaction is in parallel in this PR. So more parallelism of StreamFileWriter will benefit fast handle data in running time, but compaction just work in the time of notifyCheckpointComplete that not affect handle data.
   2. In the sence that produce little data in every checkpoint will generate small files, but if we compact files in every checkpoint will all the small files for the table as result.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-918632602


   I have also been following this thread although I did no make any comment. Let me add some thoughts since I see you are making some new changes.
   
   I am mostly on the same line of thought as @stevenzwu, I am a bit worried about the scalability of the current implementation, and I think the parallel commit proposal that @rdblue proposed could work, but in the end running compaction in streaming pipeline is likely unnecessary complication. 
   
   So far we have been advocating for streaming pipelines to just commit new files to storage, and use a separated process to handle compaction at the same time. Having the streaming pipeline also do compaction would mean that there might be 2 compaction processes competing with each other. This becomes especially complicated and prone to error when you have both batch jobs and streaming pipelines running at the same time (e.g. normal streaming + daily loading of corrected and late data). I understand it is likely a good optimization for simple use cases, but I would expect it to be a feature with a lot of in-depth knowledge to use safely and correctly if we open it for general usage.
   
   I wonder what is the initial drive behind this implementation. Do you just want to avoid a separated Spark cluster to run compaction in Spark? If we have Flink actions specifically for `RewriteDataFiles` and `RewriteDeleteFiles` that you can schedule on the same Flink cluster, would that solve the issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-890654658


   > @hameizi, could you update the PR description with details about this feature? Auto-compaction is not very specific so I'd like to hear how you implemented it and what that means.
   
   @rdblue Hi, i have update the PR description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-919272970


   > In this PR the rewriteAction of flink is parallel, it will not make data deal slow down.
   
   @hameizi by parallel, we meant multiple executors/tasks executing the rewrite. Last time I checked, this PR runs the whole rewrite action in the single committer task synchronously. that is the main scalability concern we have.
   
   Also notifyCheckpointComplete (and snapshotState) executes in the mailbox thread. if it takes a long time to finish the notifyCheckpointComplete/rewrite, it can delay the checkpoint execution.
   
   I share the same philosophy as Jack on keep the streaming ingestion simple and stable. It is critical to **reliably** ingest data into long-term data storage (like Iceberg) first, as streaming input (like Kafka) typically has short retention.
   
   > [Handle the case that RewriteFiles and RowDelta commit the transaction at the same time #2308](https://github.com/apache/iceberg/issues/2308)
   
   regarding this issue, I agree that the lock steps of commit + compaction can avoid the problem. But it is not a solution for the general problem, because other users probably have compaction jobs like Spark. There are other more sophisticated compaction/rewrite actions that probably can't be supported by single-task rewrite action at scale.
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi edited a comment on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi edited a comment on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-898161450


   > > @hameizi can you try some setup like this for the stress testing with your auto compaction change here?
   > > 
   > > * a simple job just read data from Kafka and writes to Iceberg (e.g. partitioned by ingestion/processing time).
   > > * writer: parallelism of 10 (higher can be better)
   > > * I would expect each of the 10 writer tasks probably can write data in the order of MBs/sec.
   > 
   > I'm sorry that maybe my test can't up to the standard as you say. I can test one more later.
   
   @stevenzwu I test one case, parallelism of 10, 30000+ records/sec, partition by hour, 3GB memory of per taskmanager. It's no problem for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-893146950


   > his appears to be compacting all files to be ideally the target file size bytes, regardless of their existing size etc. In some cases, the cost of opening and rewriting provides less value than leaving the data as is.
   
   I would also echo @kbendick's comment above. Currently, we are reading everything in (regardless the file sizes). This assumes all/most files are small and can benefit from a compaction rewrite. But I am not sure if the assumption is valid for broad use cases


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-918739609


   > I think the parallel commit proposal that @rdblue proposed could work
   
   In this PR the rewriteAction of flink is parallel, it will not make data deal slow down. Because when the function snapshot success flink will continue deal data but not wait the result of notifyCheckpointComplete.
   
   > I wonder what is the initial drive behind this implementation.
   
   Auto compact file every checkpoint in flink will solve several question. 
   1. It will make query iceberg table fastly every time, because in our sence we find query table slowly although we have schedule  compact file every day, but it is not enough.
   2. It will slove the bug of there is duplicate rows in iceberg primary table when we compact file https://github.com/apache/iceberg/issues/2308 . Because we strict commit one snashot and then compact file in order, so we will not cause there is one more snapshot is commit when we are compacting file. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hameizi commented on pull request #2867: Flink: Auto compact file

Posted by GitBox <gi...@apache.org>.
hameizi commented on pull request #2867:
URL: https://github.com/apache/iceberg/pull/2867#issuecomment-893244612


   > @hameizi can you try some setup like this for the stress testing with your auto compaction change here?
   > 
   > * a simple job just read data from Kafka and writes to Iceberg (e.g. partitioned by ingestion/processing time).
   > * writer: parallelism of 10 (higher can be better)
   > * I would expect each of the 10 writer tasks probably can write data in the order of MBs/sec.
   
   I'm sorry that maybe my test can't up to the standard as you say. I can test one more later. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org