You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2020/10/29 09:33:00 UTC
[jira] [Created] (FLINK-19878) Improve watermark ChangelogNormalize
for upsertSource
Leonard Xu created FLINK-19878:
----------------------------------
Summary: Improve watermark ChangelogNormalize for upsertSource
Key: FLINK-19878
URL: https://issues.apache.org/jira/browse/FLINK-19878
Project: Flink
Issue Type: Sub-task
Components: Table SQL / Planner
Reporter: Leonard Xu
Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is followed after ChangelogNormalize Node, it may returns Long.MaxValue as watermark if some parallelism doesn't have data.
As an improvement, we can move the WatermarkAssigner to be after the SourceCan Node and thus the watermark will produce like general Source.
{code:java}
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D])
+- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)