You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/23 07:23:24 UTC

[GitHub] [iceberg] hililiwei opened a new pull request, #6253: Flink: Write watermark to the snapshot summary

hililiwei opened a new pull request, #6253:
URL: https://github.com/apache/iceberg/pull/6253

   In some scenarios, the task needs to determine that all data of a certain period has been written based on the watermark.
   
   The PR writes the watermark of the task to the snapshot summary, like the flink job id.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1330006258

   
   
   
   
   
   
   How could downstream consumers know when a new partition show up? 
   
   > Watermarks alone are not friendly enough.
   
   can you elaborate the differences btw watermark and new complete partition for downstream consumers?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1328934746

   hi @rdblue,  thank you so much for your feedback.
   Before I answer your question, I'd like to say something else around this.
   
   Actually, I want to solve a problem that is common in streaming scenarios: how does the downstream application know that all of the specified period data has been written if the application is not reading incremental data in real time, but is microbatching? Watermarking is an easy solution to think of. When downstream applications get the watermark, they can know whether the event time or process time of the data has reached a critical value. But it's not enough. 
   
   If our community accepts this PR solution, I would like to do one more thing, which is to support time-based partition commit. In some scenarios, when a new partition is written, it is usually necessary to notify the downstream application. For example, When all the data for this partition is written, commit this partition to iceberg, just as flink does for [hive\filesystem](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/#partition-commit).  I once participated in the development of this part of Flink, and I hope to introduce it to iceberg sink. Because of iceberg's snapshot management feature, we may be able to do better than hive\filesystem. 
   
   The current iceberg flink sink can only commit based on checkpoint. When the time-based commit is complete, it provides a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of triggers and policies.
   
   Back to your question.
   1. In PR, the commiter caches the watemark of the current data stream and writes it to the summary when the snapshot is committed. Strictly speaking, it doesn't represent the watermark of the iceberg table, if other applications are writing in at the same time, as shown in the second figure. If there's only one application, and we can think of it as.
   
   ![image](https://user-images.githubusercontent.com/59213263/204259097-55623449-9a38-4d85-b93c-b380154eb8f0.png)
   
   2. Where does the water level come from?
   Flink provides an interface method for us to catch it. Its value depends on the low-watermark of upstream data. 
   
   ![image](https://user-images.githubusercontent.com/59213263/204259514-b257455b-67ee-4f2e-9b7d-3d979ac4d72b.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1328118241

   @hililiwei, I really like the idea of having watermarks written. Can you explain in more detail what you're doing here and where the watermark comes from?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1329430785

   > If our community accepts this PR solution, I would like to do one more thing, which is to support time-based partition commit. In some scenarios, when a new partition is written, it is usually necessary to notify the downstream application. For example, When all the data for this partition is written, commit this partition to iceberg, just as flink does for [hive\filesystem](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/#partition-commit). I once participated in the development of this part of Flink, and I hope to introduce it to iceberg sink. Because of iceberg's snapshot management feature, we may be able to do better than hive\filesystem.
   
   > The current iceberg flink sink can only commit based on checkpoint. When the time-based commit is complete, it will provides a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of triggers and policies.
   
   @hililiwei time-based partition commit seems quite complicated. trying to understand its value. With watermark info to mark the data completeness, downstream can decide which partition (hourly or daily) has the complete data and it is ok to trigger the processing of the completed hour or day. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1330024532

   Hi @stevenzwu 
   
   > What do you mean round robin?
   
   I mean, downstream applications need to get the watermark of the table at intervals to determine whether to start processing. For example, get the latest watermark every five minutes.
   
   > How could downstream consumers know when a new partition show up?
   
   The writer determines whether to commit new partitions to the table based on a combination of triggers and policies. Once a new partition is committed, it means that the writer considers the new partition's data ready. For details: 
    https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/#partition-commit.
   
   Downstream applications read tables using incremental read scan. When a new snapshot (including a new partition and data) is commited by the upstream, the downstream can get it.
   
   > can you elaborate the differences btw watermark and new complete partition for downstream consumers?
   
   Take a scenario in our current production as an example. When my table is partitioned by hour, if the data in [12:00,13:00) is not completely written, I do not want consumers to see it when querying the table. The query action may be an ad hoc query or a statistics task. I want to commit the snapshot after all the data in [12:00, 13:00) is written instead of commting the snapshot at each checkpoint.
   
   Thx
   Liwei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1330000575

   > @hililiwei time-based partition commit seems quite complicated. trying to understand its value. With watermark info to mark the data completeness, downstream can decide which partition (hourly or daily) has the complete data and it is ok to trigger the processing of the completed hour or day.
   
   Hi @stevenzwu 
   1. If time-based commit is supported, as long as the partition is visible, its data is ready. downstream applications can directly use incremental reads instead of round robin.
   2. In some scenarios, when the partition data is not completely written, we hope that downstream applications cannot see it. Watermarks alone are not friendly enough.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1329418892

   there is a previous attempt on writing the watermark (data completeness) to the Iceberg table: https://github.com/apache/iceberg/pull/2109/files.
   
   Let me summarize the questions/points. Would be great to gather some inputs. @rdblue @pvary @dixingxing0 @zhangjun0x01 @liubo1022126 
   
   1. should we write the Flink watermark or use the min value of some timestamp column (typically associated with the time partitioned table)? it is easier to just write Flink watermark. but it might be more meaningful to use the timestamp column value as it is used for table partition.
   2. should we write the metadata as snapshot summary or table properties? It is minimal change to write as snapshot summary as shown in this PR or PR #2109. It is a little bigger change (like using transaction) to write as a table property, but it will be easier for consumer to extract the info.
   3. we need to be able to support multiple Flink jobs (e.g. from multiple regions) writing to the same Iceberg table. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1364513120

   >should we write the metadata as snapshot summary or table properties? It is minimal change to write as snapshot summary as shown in this PR or PR https://github.com/apache/iceberg/pull/2109. It is a little bigger change (like using transaction) to write as a table property, but it will be easier for consumer to extract the info.
   
   I think it's two aspects. It represents different meanings.  I include it in the summary because it does not represent the water mark for the table, but simply represents the current flink task that generated the snapshot.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huyuanfeng2018 commented on pull request #6253: Flink: Write watermark to the snapshot summary

Posted by "huyuanfeng2018 (via GitHub)" <gi...@apache.org>.
huyuanfeng2018 commented on PR #6253:
URL: https://github.com/apache/iceberg/pull/6253#issuecomment-1707952709

   in our practice,If the flink task turns on non-aligned checkpoints, it may cause the watermark to advance too much.
   @hililiwei 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org