You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2020/10/29 09:33:00 UTC

[jira] [Created] (FLINK-19878) Improve watermark ChangelogNormalize for upsertSource

Leonard Xu created FLINK-19878:
----------------------------------

             Summary: Improve watermark ChangelogNormalize for upsertSource
                 Key: FLINK-19878
                 URL: https://issues.apache.org/jira/browse/FLINK-19878
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / Planner
            Reporter: Leonard Xu


Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is followed after ChangelogNormalize Node,  it may returns Long.MaxValue as watermark if some parallelism doesn't have data. 

As an improvement, we can move the WatermarkAssigner to be after the SourceCan Node and thus the watermark will produce like general Source.

 
{code:java}
   +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
      +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D])
         +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
            +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
               +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)