You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shengkai Fang <fs...@gmail.com> on 2020/09/04 06:58:14 UTC

Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

   Hi, all. Currently, we use two seperated interfaces
SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The
interface SupportsWatermarkPushDown relies on
SupportsComputedColumnPushDown when
watermark is defined on a computed column. During the implementation, we
find the method in SupportsWatermarkPushDown uses an out-of-date interface
WatermarkProvider and the duplication of SupportsComputedColumnPushDown and
SupportsProjectionPushDown. Therefore, we decide to propose a new interface
of SupportsWatermarkPushDown to solve the problems we mentioned.


*Problems of SupportsComputedColumnPushDown and
SupportsWatermarkPushDown*Problems
of SupportsWatermarkPushDown

SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to
register WatermarkGenerator into DynamicTableSource now. However, the
community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
create watermark generators in FLIP-126. WatermarkStrategy is a factory of
TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses the
method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
is used to generate deprecated AssignerWithPeriodicWatermarks and
PunctuatedWatermarkAssignerProvider. Therefore, we think it's not suitable
to use the WatermarkProvider any more.


Problems of SupportsComputedColumnPushDown

There are two problems around when using SupportsComputedColumnPushDown
 alone.

First, planner will transform the computed column and query such as select
a+b to a LogicalProject. When it comes to the optimization phase, we have
no means to distinguish whether the Rexnode in the projection is from
computed columns or query. So SupportsComputedColumnPushDown in reality
will push not only the computed column but also the calculation in the
query.

Second, when a plan matches the rule
PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to
place all fields we require. However, both two rules
PushComputedColumnIntoTableSourceScanRule and
PushProjectIntoTableSourceScanRule will do the same work that prune the
records that read from source. It seems that we have two duplicate rules in
planner. But I think we should use the rule
PushProjectIntoTableSourceScanRule rather than
PushComputedColumnIntoTableSourceScanRule if we don't support watermark
push down. Compared to PushComputedColumnIntoTableSourceScanRule,
PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
data from source rather than use a map function in flink.

Therefore, we think it's not a good idea to use two interfaces rather than
one.


*New Proposal*

First of all, let us address some background when pushing watermarks into
table source scan. There are two structures that we need to consider. We
list two examples below for discussion.

structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])structure
2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
5000:INTERVAL SECOND)])   +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])

As we can see, structure 2 is much more complicated than structure 1. For
structure 1, we can use the row from table scan to generate watermarks
directly. But for structure 2, we need to calculate the rowtime expression
in LogicalProject and use the result of calculation to generate watermarks.
Considering that WatermarkStrategy has the ability to extract timestamp
from row, we have a proposal to push only WatermarkStrategy to scan.


Push WatermarkStrategy to Scan

In this interface, we will only push WatermarkStrategy to DynamicTableSource
.

public interface SupportsWatermarkPushDown {        void
applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }

The advantage of the new api is that it's very simple for the developers of
Connector. They only need to take WatermarkStrategy into consideration and
don't need to deal with other infos such as ComputedColumnConverter in
SupportsComputedColumnPushDown. But it also has one disadvantage that it
needs to calculate the rowtime expression again in LogicalProjection
because we don't build a new row in scan to store the calculated timestamp.
However, we can replace the calculation of rowtime in LogicalProjection
with a reference to eliminate duplicate calculation, which will use the
StreamRecord's getter to read the timestamp that is calculated before. But
this optimization still has one limitation that it relies on computed
columns are not defined on other computed columns. For nested computed
columns, we have no place to save the intermediate result.

But we still have a problem that when we push an udf into the source, we
need a context as powerful as FunctionContext to open the udf. But the
current WatermarkGeneratorSupplier.Context only supports method
getMetricGroup and misses methods getCachedFile, getJobParameter and g
etExternalResourceInfos, which means we can't convert the
WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
that the udf is only used to generate watermark, we suggest to throw
UnsupportedException when invoking the methods exist in FunctionContext but
don't exist in WatermarkGeneratorSupplier.Context. But we have to admit
that there are risks in doing so because we have no promise that the udf
will not invoke these methods.


*Summary*

We have addressed the whole problem and solution in detail. As a
conclusion, I think the new interface avoids the problems we mentioned
before. It has a clear definition as its name tells and has nothing in
common with SupportProjectionPushDown. As for its disadvantage, I think
it's acceptable to calculate the rowtime column twice and we also have a
plan to improve its efficiency as a follow up if it brings some performance
problems.

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

Posted by Shengkai Fang <fs...@gmail.com>.
Hi, Timo.
I agree with you that the concepts Watermark and ComputedColumn should be
separated. However, we are merging the interface SupportsCalcPushDown and
SupportsWatermarkPushDown actually. The concept computed column has
disappeared in optimization.
As for the drawback you mentiond, I have already given a solution to solve.
We can place the calculated computed timestamp column on StreamRecord and
replace the calculation in LogicalProject with a call of
StreamRecordTimestampSqlFunction which will read the timestamp on
StreamRecord.


Regards,
Shengkai.

Timo Walther <tw...@apache.org> 于2020年9月8日周二 下午8:51写道:

> Hi Jark, Hi Shengkai,
>
> "shall we push the expressions in the following Projection too?"
>
> This is something that we should at least consider.
>
> I also don't find a strong use case. But what I see is that we are
> merging concepts that actually can be separated. And we are executing
> the same code twice. Regardless of what kind of code is executed (simple
> timestamp casting or more complex stuff).
>
> In any case, we cannot model ingestion time with the merged interfaces.
> Because the computed timestamp column is evaluated twice. If a function
> in the computed column is not deterministic, a watermark_rowtime !=
> projection_rowtime mismatch is very hard to debug.
>
> Regards,
> Timo
>
>
>
> On 08.09.20 14:11, Shengkai Fang wrote:
> > Hi Timo and Jark.Thanks for your replies.
> >
> > Maybe I don't explain clearly in doc. I think the main reason behind is
> we
> > have no means to distinguish the calc in LogicalProject. Let me give you
> an
> > example to illustrate the reason. Assume we have 2 cases:
> >
> >
> > case 1:
> >
> > create table MyTable (
> >
> > int a,
> >
> > int b
> >
> > ) with (
> >
> > ...
> >
> > )
> >
> >
> > we use sql "select a, b, a+b as c from MyTable" to get the results.
> >
> >
> > and
> >
> >
> > case 2:
> >
> > create table MyTableWithComputedColumn (
> >
> > a int,
> >
> > b int,
> >
> > c as a + b
> >
> > ) with (
> >
> > ...
> >
> > )
> >
> >
> > we use sql "select a, b, c from MyTableWithComputedColumn" to get the
> > results.
> >
> >
> > When coming to planner, the two sqls will have the same plan, which means
> > we will also push calculation from query to scan if we support computed
> > column push down.
> >
> >
> > As a supplement of Jark's response, currently
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy)
> > uses WatermarkStrategy to register watermark generator supplier. I think
> > it's ok to use WatermarkStrategy directly because FLIP-126 has been
> > finished.
> >
> >
> >
> > Jark Wu <im...@gmail.com> 于2020年9月8日周二 下午7:38写道:
> >
> >> Hi Timo,
> >>
> >> Regarding "pushing other computed columns into source, e.g. encrypted
> >> records/columns, performing checksum checks, reading metadata etc.",
> >> I'm not sure about this.
> >> 1. the planner don't know which computed column should be pushed into
> >> source
> >> 2. it seems that we can't improve performances if we pushdown complex
> logic
> >> into source, we still need to calculate them anyway.
> >> 3. the computed column is a regular expression, if the computed column
> >> should be pushed down, then shall we push the expressions in the
> following
> >> Projection too?
> >>      If yes, then the name of "SupportsComputedColumnPushDown" might be
> not
> >> correct.
> >> 4. regarding reading metadata, according to FLIP-107, we don't use the
> >> existing SupportsComputedColumnPushDown, but a new interface.
> >>
> >> Therefore, I don't find a strong use case that needs this interface so
> far.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >>
> >> On Tue, 8 Sep 2020 at 17:13, Timo Walther <tw...@apache.org> wrote:
> >>
> >>> Hi Shengkai,
> >>>
> >>> first of I would not consider the section Problems of
> >>> SupportsWatermarkPushDown" as a "problem". It was planned to update the
> >>> WatermarkProvider once the interfaces are ready. See the comment in
> >>> WatermarkProvider:
> >>>
> >>> // marker interface that will be filled after FLIP-126:
> >>> // WatermarkGenerator<RowData> getWatermarkGenerator();
> >>>
> >>> So far we had no sources that actually implement WatermarkStrategy.
> >>>
> >>> Second, for generating watermarks I don't see a problem in merging the
> >>> two mentioned interfaces SupportsWatermarkPushDown and
> >>> SupportsComputedColumnPushDown into one. The descibed design sounds
> >>> reasonable to me and the impact on performance should not be too large.
> >>>
> >>> However, by merging these two interfaces we are also merging two
> >>> completely separate concepts. Computed columns are not always used for
> >>> generating a rowtime or watermark. Users can and certainly will
> >>> implement more complex logic in there. One example could be decrypting
> >>> encrypted records/columns, performing checksum checks, reading metadata
> >>> etc.
> >>>
> >>> So in any case we should still provide two interfaces:
> >>>
> >>> SupportsWatermarkPushDown (functionality of computed columns +
> >> watermarks)
> >>>
> >>>
> >>> SupportsComputedColumnPushDown (functionality of computed columns only)
> >>>
> >>> I'm fine with such a design, but it is also confusing for implementers
> >>> that SupportsWatermarkPushDown includes the functionality of the other
> >>> interface.
> >>>
> >>> What do you think?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 08.09.20 04:32, Jark Wu wrote:
> >>>> Thanks to Shengkai for summarizing the problems on the FLIP-95
> >> interfaces
> >>>> and solutions.
> >>>>
> >>>> I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
> >>> much
> >>>> cleaner and easier to develop than before.
> >>>> So I'm +1 to the proposal.
> >>>>
> >>>> Best,
> >>>> Jark
> >>>>
> >>>> On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fs...@gmail.com> wrote:
> >>>>
> >>>>> Hi, all. It seems the format is not normal. So I add a google doc in
> >>>>> link[1]. This discussion is about the interfaces in FLIP-95:  New
> >> Table
> >>>>> Source And Table Sink and propose to merge two interfaces
> >>>>> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
> >>>>>
> >>>>> I am looking forward to any opinions and suggestions from the
> >> community.
> >>>>>
> >>>>> Regards,
> >>>>> Shengkai
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> >>>
> >>
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
> >>>>>
> >>>>> Shengkai Fang <fs...@gmail.com> 于2020年9月4日周五 下午2:58写道:
> >>>>>
> >>>>>>      Hi, all. Currently, we use two seperated interfaces
> >>>>>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in
> >> design.
> >>>>> The
> >>>>>> interface SupportsWatermarkPushDown relies on
> >>>>>> SupportsComputedColumnPushDown when watermark is defined on a
> >> computed
> >>>>>> column. During the implementation, we find the method in
> >>>>>> SupportsWatermarkPushDown uses an out-of-date interface
> >>> WatermarkProvider
> >>>>>> and the duplication of SupportsComputedColumnPushDown and
> >>>>>> SupportsProjectionPushDown. Therefore, we decide to propose a new
> >>>>>> interface of SupportsWatermarkPushDown to solve the problems we
> >>>>> mentioned.
> >>>>>>
> >>>>>>
> >>>>>> *Problems of SupportsComputedColumnPushDown and
> >>>>> SupportsWatermarkPushDown*Problems
> >>>>>> of SupportsWatermarkPushDown
> >>>>>>
> >>>>>> SupportsWatermarkPushDown uses an inner interface named
> >>>>> WatermarkProvider to
> >>>>>> register WatermarkGenerator into DynamicTableSource now. However,
> the
> >>>>>> community uses
> >> org.apache.flink.api.common.eventtime.WatermarkStrategy
> >>> to
> >>>>>> create watermark generators in FLIP-126. WatermarkStrategy is a
> >> factory
> >>>>>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer
> >> uses
> >>>>>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to
> >> generate
> >>>>>> Kafka-partition-aware watermarks. As for the origin
> >> WatermarkProvider,
> >>> it
> >>>>>> is used to generate deprecated AssignerWithPeriodicWatermarks and
> >>>>>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> >>>>>> suitable to use the WatermarkProvider any more.
> >>>>>>
> >>>>>>
> >>>>>> Problems of SupportsComputedColumnPushDown
> >>>>>>
> >>>>>> There are two problems around when using
> >> SupportsComputedColumnPushDown
> >>>>>>    alone.
> >>>>>>
> >>>>>> First, planner will transform the computed column and query such as
> >>>>> select
> >>>>>> a+b to a LogicalProject. When it comes to the optimization phase, we
> >>> have
> >>>>>> no means to distinguish whether the Rexnode in the projection is
> from
> >>>>>> computed columns or query. So SupportsComputedColumnPushDown in
> >> reality
> >>>>>> will push not only the computed column but also the calculation in
> >> the
> >>>>>> query.
> >>>>>>
> >>>>>> Second, when a plan matches the rule
> >>>>>> PushComputedColumnIntoTableSourceScanRule, we have to build a new
> >>>>> RowData to
> >>>>>> place all fields we require. However, both two rules
> >>>>>> PushComputedColumnIntoTableSourceScanRule and
> >>>>>> PushProjectIntoTableSourceScanRule will do the same work that prune
> >> the
> >>>>>> records that read from source. It seems that we have two duplicate
> >>> rules
> >>>>> in
> >>>>>> planner. But I think we should use the rule
> >>>>>> PushProjectIntoTableSourceScanRule rather than
> >>>>>> PushComputedColumnIntoTableSourceScanRule if we don't support
> >> watermark
> >>>>>> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> >>>>>> PushProjectIntoTableSourceScanRule is much lighter and we can read
> >>> pruned
> >>>>>> data from source rather than use a map function in flink.
> >>>>>>
> >>>>>> Therefore, we think it's not a good idea to use two interfaces
> rather
> >>>>> than
> >>>>>> one.
> >>>>>>
> >>>>>>
> >>>>>> *New Proposal*
> >>>>>>
> >>>>>> First of all, let us address some background when pushing watermarks
> >>> into
> >>>>>> table source scan. There are two structures that we need to
> consider.
> >>> We
> >>>>>> list two examples below for discussion.
> >>>>>>
> >>>>>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
> >>>>> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
> >>>>> default_database, MyTable]])structure
> >>>>> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3,
> 5000:INTERVAL
> >>>>> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
> >>> 5000:INTERVAL
> >>>>> SECOND)])   +- LogicalTableScan(table=[[default_catalog,
> >>> default_database,
> >>>>> MyTable]])
> >>>>>>
> >>>>>> As we can see, structure 2 is much more complicated than structure
> 1.
> >>> For
> >>>>>> structure 1, we can use the row from table scan to generate
> >> watermarks
> >>>>>> directly. But for structure 2, we need to calculate the rowtime
> >>>>> expression
> >>>>>> in LogicalProject and use the result of calculation to generate
> >>>>>> watermarks. Considering that WatermarkStrategy has the ability to
> >>> extract
> >>>>>> timestamp from row, we have a proposal to push only
> WatermarkStrategy
> >>> to
> >>>>>> scan.
> >>>>>>
> >>>>>>
> >>>>>> Push WatermarkStrategy to Scan
> >>>>>>
> >>>>>> In this interface, we will only push WatermarkStrategy to
> >>>>>> DynamicTableSource.
> >>>>>>
> >>>>>> public interface SupportsWatermarkPushDown {        void
> >>>>> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
> >>>>>>
> >>>>>> The advantage of the new api is that it's very simple for the
> >>> developers
> >>>>>> of Connector. They only need to take WatermarkStrategy into
> >>> consideration
> >>>>>> and don't need to deal with other infos such as
> >> ComputedColumnConverter
> >>>>>> in SupportsComputedColumnPushDown. But it also has one disadvantage
> >>> that
> >>>>>> it needs to calculate the rowtime expression again in
> >> LogicalProjection
> >>>>>> because we don't build a new row in scan to store the calculated
> >>>>> timestamp.
> >>>>>> However, we can replace the calculation of rowtime in
> >> LogicalProjection
> >>>>>> with a reference to eliminate duplicate calculation, which will use
> >> the
> >>>>>> StreamRecord's getter to read the timestamp that is calculated
> >> before.
> >>>>> But
> >>>>>> this optimization still has one limitation that it relies on
> computed
> >>>>>> columns are not defined on other computed columns. For nested
> >> computed
> >>>>>> columns, we have no place to save the intermediate result.
> >>>>>>
> >>>>>> But we still have a problem that when we push an udf into the
> source,
> >>> we
> >>>>>> need a context as powerful as FunctionContext to open the udf. But
> >> the
> >>>>>> current WatermarkGeneratorSupplier.Context only supports method
> >>>>>> getMetricGroup and misses methods getCachedFile, getJobParameter and
> >> g
> >>>>>> etExternalResourceInfos, which means we can't convert the
> >>>>>> WatermarkGeneratorSupplier.Context to FunctionContext safely.
> >>> Considering
> >>>>>> that the udf is only used to generate watermark, we suggest to throw
> >>>>>> UnsupportedException when invoking the methods exist in
> >> FunctionContext
> >>>>>> but don't exist in WatermarkGeneratorSupplier.Context. But we have
> to
> >>>>>> admit that there are risks in doing so because we have no promise
> >> that
> >>>>> the
> >>>>>> udf will not invoke these methods.
> >>>>>>
> >>>>>>
> >>>>>> *Summary*
> >>>>>>
> >>>>>> We have addressed the whole problem and solution in detail. As a
> >>>>>> conclusion, I think the new interface avoids the problems we
> >> mentioned
> >>>>>> before. It has a clear definition as its name tells and has nothing
> >> in
> >>>>>> common with SupportProjectionPushDown. As for its disadvantage, I
> >> think
> >>>>>> it's acceptable to calculate the rowtime column twice and we also
> >> have
> >>> a
> >>>>>> plan to improve its efficiency as a follow up if it brings some
> >>>>> performance
> >>>>>> problems.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

Posted by Timo Walther <tw...@apache.org>.
Hi Jark, Hi Shengkai,

"shall we push the expressions in the following Projection too?"

This is something that we should at least consider.

I also don't find a strong use case. But what I see is that we are 
merging concepts that actually can be separated. And we are executing 
the same code twice. Regardless of what kind of code is executed (simple 
timestamp casting or more complex stuff).

In any case, we cannot model ingestion time with the merged interfaces. 
Because the computed timestamp column is evaluated twice. If a function 
in the computed column is not deterministic, a watermark_rowtime != 
projection_rowtime mismatch is very hard to debug.

Regards,
Timo



On 08.09.20 14:11, Shengkai Fang wrote:
> Hi Timo and Jark.Thanks for your replies.
> 
> Maybe I don't explain clearly in doc. I think the main reason behind is we
> have no means to distinguish the calc in LogicalProject. Let me give you an
> example to illustrate the reason. Assume we have 2 cases:
> 
> 
> case 1:
> 
> create table MyTable (
> 
> int a,
> 
> int b
> 
> ) with (
> 
> ...
> 
> )
> 
> 
> we use sql "select a, b, a+b as c from MyTable" to get the results.
> 
> 
> and
> 
> 
> case 2:
> 
> create table MyTableWithComputedColumn (
> 
> a int,
> 
> b int,
> 
> c as a + b
> 
> ) with (
> 
> ...
> 
> )
> 
> 
> we use sql "select a, b, c from MyTableWithComputedColumn" to get the
> results.
> 
> 
> When coming to planner, the two sqls will have the same plan, which means
> we will also push calculation from query to scan if we support computed
> column push down.
> 
> 
> As a supplement of Jark's response, currently
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy)
> uses WatermarkStrategy to register watermark generator supplier. I think
> it's ok to use WatermarkStrategy directly because FLIP-126 has been
> finished.
> 
> 
> 
> Jark Wu <im...@gmail.com> 于2020年9月8日周二 下午7:38写道:
> 
>> Hi Timo,
>>
>> Regarding "pushing other computed columns into source, e.g. encrypted
>> records/columns, performing checksum checks, reading metadata etc.",
>> I'm not sure about this.
>> 1. the planner don't know which computed column should be pushed into
>> source
>> 2. it seems that we can't improve performances if we pushdown complex logic
>> into source, we still need to calculate them anyway.
>> 3. the computed column is a regular expression, if the computed column
>> should be pushed down, then shall we push the expressions in the following
>> Projection too?
>>      If yes, then the name of "SupportsComputedColumnPushDown" might be not
>> correct.
>> 4. regarding reading metadata, according to FLIP-107, we don't use the
>> existing SupportsComputedColumnPushDown, but a new interface.
>>
>> Therefore, I don't find a strong use case that needs this interface so far.
>>
>> Best,
>> Jark
>>
>>
>>
>>
>> On Tue, 8 Sep 2020 at 17:13, Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi Shengkai,
>>>
>>> first of I would not consider the section Problems of
>>> SupportsWatermarkPushDown" as a "problem". It was planned to update the
>>> WatermarkProvider once the interfaces are ready. See the comment in
>>> WatermarkProvider:
>>>
>>> // marker interface that will be filled after FLIP-126:
>>> // WatermarkGenerator<RowData> getWatermarkGenerator();
>>>
>>> So far we had no sources that actually implement WatermarkStrategy.
>>>
>>> Second, for generating watermarks I don't see a problem in merging the
>>> two mentioned interfaces SupportsWatermarkPushDown and
>>> SupportsComputedColumnPushDown into one. The descibed design sounds
>>> reasonable to me and the impact on performance should not be too large.
>>>
>>> However, by merging these two interfaces we are also merging two
>>> completely separate concepts. Computed columns are not always used for
>>> generating a rowtime or watermark. Users can and certainly will
>>> implement more complex logic in there. One example could be decrypting
>>> encrypted records/columns, performing checksum checks, reading metadata
>>> etc.
>>>
>>> So in any case we should still provide two interfaces:
>>>
>>> SupportsWatermarkPushDown (functionality of computed columns +
>> watermarks)
>>>
>>>
>>> SupportsComputedColumnPushDown (functionality of computed columns only)
>>>
>>> I'm fine with such a design, but it is also confusing for implementers
>>> that SupportsWatermarkPushDown includes the functionality of the other
>>> interface.
>>>
>>> What do you think?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 08.09.20 04:32, Jark Wu wrote:
>>>> Thanks to Shengkai for summarizing the problems on the FLIP-95
>> interfaces
>>>> and solutions.
>>>>
>>>> I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
>>> much
>>>> cleaner and easier to develop than before.
>>>> So I'm +1 to the proposal.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fs...@gmail.com> wrote:
>>>>
>>>>> Hi, all. It seems the format is not normal. So I add a google doc in
>>>>> link[1]. This discussion is about the interfaces in FLIP-95:  New
>> Table
>>>>> Source And Table Sink and propose to merge two interfaces
>>>>> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
>>>>>
>>>>> I am looking forward to any opinions and suggestions from the
>> community.
>>>>>
>>>>> Regards,
>>>>> Shengkai
>>>>>
>>>>> [1]
>>>>>
>>>>>
>>>
>> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
>>>>>
>>>>> Shengkai Fang <fs...@gmail.com> 于2020年9月4日周五 下午2:58写道:
>>>>>
>>>>>>      Hi, all. Currently, we use two seperated interfaces
>>>>>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in
>> design.
>>>>> The
>>>>>> interface SupportsWatermarkPushDown relies on
>>>>>> SupportsComputedColumnPushDown when watermark is defined on a
>> computed
>>>>>> column. During the implementation, we find the method in
>>>>>> SupportsWatermarkPushDown uses an out-of-date interface
>>> WatermarkProvider
>>>>>> and the duplication of SupportsComputedColumnPushDown and
>>>>>> SupportsProjectionPushDown. Therefore, we decide to propose a new
>>>>>> interface of SupportsWatermarkPushDown to solve the problems we
>>>>> mentioned.
>>>>>>
>>>>>>
>>>>>> *Problems of SupportsComputedColumnPushDown and
>>>>> SupportsWatermarkPushDown*Problems
>>>>>> of SupportsWatermarkPushDown
>>>>>>
>>>>>> SupportsWatermarkPushDown uses an inner interface named
>>>>> WatermarkProvider to
>>>>>> register WatermarkGenerator into DynamicTableSource now. However, the
>>>>>> community uses
>> org.apache.flink.api.common.eventtime.WatermarkStrategy
>>> to
>>>>>> create watermark generators in FLIP-126. WatermarkStrategy is a
>> factory
>>>>>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer
>> uses
>>>>>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to
>> generate
>>>>>> Kafka-partition-aware watermarks. As for the origin
>> WatermarkProvider,
>>> it
>>>>>> is used to generate deprecated AssignerWithPeriodicWatermarks and
>>>>>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
>>>>>> suitable to use the WatermarkProvider any more.
>>>>>>
>>>>>>
>>>>>> Problems of SupportsComputedColumnPushDown
>>>>>>
>>>>>> There are two problems around when using
>> SupportsComputedColumnPushDown
>>>>>>    alone.
>>>>>>
>>>>>> First, planner will transform the computed column and query such as
>>>>> select
>>>>>> a+b to a LogicalProject. When it comes to the optimization phase, we
>>> have
>>>>>> no means to distinguish whether the Rexnode in the projection is from
>>>>>> computed columns or query. So SupportsComputedColumnPushDown in
>> reality
>>>>>> will push not only the computed column but also the calculation in
>> the
>>>>>> query.
>>>>>>
>>>>>> Second, when a plan matches the rule
>>>>>> PushComputedColumnIntoTableSourceScanRule, we have to build a new
>>>>> RowData to
>>>>>> place all fields we require. However, both two rules
>>>>>> PushComputedColumnIntoTableSourceScanRule and
>>>>>> PushProjectIntoTableSourceScanRule will do the same work that prune
>> the
>>>>>> records that read from source. It seems that we have two duplicate
>>> rules
>>>>> in
>>>>>> planner. But I think we should use the rule
>>>>>> PushProjectIntoTableSourceScanRule rather than
>>>>>> PushComputedColumnIntoTableSourceScanRule if we don't support
>> watermark
>>>>>> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
>>>>>> PushProjectIntoTableSourceScanRule is much lighter and we can read
>>> pruned
>>>>>> data from source rather than use a map function in flink.
>>>>>>
>>>>>> Therefore, we think it's not a good idea to use two interfaces rather
>>>>> than
>>>>>> one.
>>>>>>
>>>>>>
>>>>>> *New Proposal*
>>>>>>
>>>>>> First of all, let us address some background when pushing watermarks
>>> into
>>>>>> table source scan. There are two structures that we need to consider.
>>> We
>>>>>> list two examples below for discussion.
>>>>>>
>>>>>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
>>>>> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
>>>>> default_database, MyTable]])structure
>>>>> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
>>>>> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
>>> 5000:INTERVAL
>>>>> SECOND)])   +- LogicalTableScan(table=[[default_catalog,
>>> default_database,
>>>>> MyTable]])
>>>>>>
>>>>>> As we can see, structure 2 is much more complicated than structure 1.
>>> For
>>>>>> structure 1, we can use the row from table scan to generate
>> watermarks
>>>>>> directly. But for structure 2, we need to calculate the rowtime
>>>>> expression
>>>>>> in LogicalProject and use the result of calculation to generate
>>>>>> watermarks. Considering that WatermarkStrategy has the ability to
>>> extract
>>>>>> timestamp from row, we have a proposal to push only WatermarkStrategy
>>> to
>>>>>> scan.
>>>>>>
>>>>>>
>>>>>> Push WatermarkStrategy to Scan
>>>>>>
>>>>>> In this interface, we will only push WatermarkStrategy to
>>>>>> DynamicTableSource.
>>>>>>
>>>>>> public interface SupportsWatermarkPushDown {        void
>>>>> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
>>>>>>
>>>>>> The advantage of the new api is that it's very simple for the
>>> developers
>>>>>> of Connector. They only need to take WatermarkStrategy into
>>> consideration
>>>>>> and don't need to deal with other infos such as
>> ComputedColumnConverter
>>>>>> in SupportsComputedColumnPushDown. But it also has one disadvantage
>>> that
>>>>>> it needs to calculate the rowtime expression again in
>> LogicalProjection
>>>>>> because we don't build a new row in scan to store the calculated
>>>>> timestamp.
>>>>>> However, we can replace the calculation of rowtime in
>> LogicalProjection
>>>>>> with a reference to eliminate duplicate calculation, which will use
>> the
>>>>>> StreamRecord's getter to read the timestamp that is calculated
>> before.
>>>>> But
>>>>>> this optimization still has one limitation that it relies on computed
>>>>>> columns are not defined on other computed columns. For nested
>> computed
>>>>>> columns, we have no place to save the intermediate result.
>>>>>>
>>>>>> But we still have a problem that when we push an udf into the source,
>>> we
>>>>>> need a context as powerful as FunctionContext to open the udf. But
>> the
>>>>>> current WatermarkGeneratorSupplier.Context only supports method
>>>>>> getMetricGroup and misses methods getCachedFile, getJobParameter and
>> g
>>>>>> etExternalResourceInfos, which means we can't convert the
>>>>>> WatermarkGeneratorSupplier.Context to FunctionContext safely.
>>> Considering
>>>>>> that the udf is only used to generate watermark, we suggest to throw
>>>>>> UnsupportedException when invoking the methods exist in
>> FunctionContext
>>>>>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to
>>>>>> admit that there are risks in doing so because we have no promise
>> that
>>>>> the
>>>>>> udf will not invoke these methods.
>>>>>>
>>>>>>
>>>>>> *Summary*
>>>>>>
>>>>>> We have addressed the whole problem and solution in detail. As a
>>>>>> conclusion, I think the new interface avoids the problems we
>> mentioned
>>>>>> before. It has a clear definition as its name tells and has nothing
>> in
>>>>>> common with SupportProjectionPushDown. As for its disadvantage, I
>> think
>>>>>> it's acceptable to calculate the rowtime column twice and we also
>> have
>>> a
>>>>>> plan to improve its efficiency as a follow up if it brings some
>>>>> performance
>>>>>> problems.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 


Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

Posted by Shengkai Fang <fs...@gmail.com>.
Hi Timo and Jark.Thanks for your replies.

Maybe I don't explain clearly in doc. I think the main reason behind is we
have no means to distinguish the calc in LogicalProject. Let me give you an
example to illustrate the reason. Assume we have 2 cases:


case 1:

create table MyTable (

int a,

int b

) with (

...

)


we use sql "select a, b, a+b as c from MyTable" to get the results.


and


case 2:

create table MyTableWithComputedColumn (

a int,

b int,

c as a + b

) with (

...

)


we use sql "select a, b, c from MyTableWithComputedColumn" to get the
results.


When coming to planner, the two sqls will have the same plan, which means
we will also push calculation from query to scan if we support computed
column push down.


As a supplement of Jark's response, currently
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy)
uses WatermarkStrategy to register watermark generator supplier. I think
it's ok to use WatermarkStrategy directly because FLIP-126 has been
finished.



Jark Wu <im...@gmail.com> 于2020年9月8日周二 下午7:38写道:

> Hi Timo,
>
> Regarding "pushing other computed columns into source, e.g. encrypted
> records/columns, performing checksum checks, reading metadata etc.",
> I'm not sure about this.
> 1. the planner don't know which computed column should be pushed into
> source
> 2. it seems that we can't improve performances if we pushdown complex logic
> into source, we still need to calculate them anyway.
> 3. the computed column is a regular expression, if the computed column
> should be pushed down, then shall we push the expressions in the following
> Projection too?
>     If yes, then the name of "SupportsComputedColumnPushDown" might be not
> correct.
> 4. regarding reading metadata, according to FLIP-107, we don't use the
> existing SupportsComputedColumnPushDown, but a new interface.
>
> Therefore, I don't find a strong use case that needs this interface so far.
>
> Best,
> Jark
>
>
>
>
> On Tue, 8 Sep 2020 at 17:13, Timo Walther <tw...@apache.org> wrote:
>
> > Hi Shengkai,
> >
> > first of I would not consider the section Problems of
> > SupportsWatermarkPushDown" as a "problem". It was planned to update the
> > WatermarkProvider once the interfaces are ready. See the comment in
> > WatermarkProvider:
> >
> > // marker interface that will be filled after FLIP-126:
> > // WatermarkGenerator<RowData> getWatermarkGenerator();
> >
> > So far we had no sources that actually implement WatermarkStrategy.
> >
> > Second, for generating watermarks I don't see a problem in merging the
> > two mentioned interfaces SupportsWatermarkPushDown and
> > SupportsComputedColumnPushDown into one. The descibed design sounds
> > reasonable to me and the impact on performance should not be too large.
> >
> > However, by merging these two interfaces we are also merging two
> > completely separate concepts. Computed columns are not always used for
> > generating a rowtime or watermark. Users can and certainly will
> > implement more complex logic in there. One example could be decrypting
> > encrypted records/columns, performing checksum checks, reading metadata
> > etc.
> >
> > So in any case we should still provide two interfaces:
> >
> > SupportsWatermarkPushDown (functionality of computed columns +
> watermarks)
> >
> >
> > SupportsComputedColumnPushDown (functionality of computed columns only)
> >
> > I'm fine with such a design, but it is also confusing for implementers
> > that SupportsWatermarkPushDown includes the functionality of the other
> > interface.
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> >
> > On 08.09.20 04:32, Jark Wu wrote:
> > > Thanks to Shengkai for summarizing the problems on the FLIP-95
> interfaces
> > > and solutions.
> > >
> > > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
> > much
> > > cleaner and easier to develop than before.
> > > So I'm +1 to the proposal.
> > >
> > > Best,
> > > Jark
> > >
> > > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fs...@gmail.com> wrote:
> > >
> > >> Hi, all. It seems the format is not normal. So I add a google doc in
> > >> link[1]. This discussion is about the interfaces in FLIP-95:  New
> Table
> > >> Source And Table Sink and propose to merge two interfaces
> > >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
> > >>
> > >> I am looking forward to any opinions and suggestions from the
> community.
> > >>
> > >> Regards,
> > >> Shengkai
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
> > >>
> > >> Shengkai Fang <fs...@gmail.com> 于2020年9月4日周五 下午2:58写道:
> > >>
> > >>>     Hi, all. Currently, we use two seperated interfaces
> > >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in
> design.
> > >> The
> > >>> interface SupportsWatermarkPushDown relies on
> > >>> SupportsComputedColumnPushDown when watermark is defined on a
> computed
> > >>> column. During the implementation, we find the method in
> > >>> SupportsWatermarkPushDown uses an out-of-date interface
> > WatermarkProvider
> > >>> and the duplication of SupportsComputedColumnPushDown and
> > >>> SupportsProjectionPushDown. Therefore, we decide to propose a new
> > >>> interface of SupportsWatermarkPushDown to solve the problems we
> > >> mentioned.
> > >>>
> > >>>
> > >>> *Problems of SupportsComputedColumnPushDown and
> > >> SupportsWatermarkPushDown*Problems
> > >>> of SupportsWatermarkPushDown
> > >>>
> > >>> SupportsWatermarkPushDown uses an inner interface named
> > >> WatermarkProvider to
> > >>> register WatermarkGenerator into DynamicTableSource now. However, the
> > >>> community uses
> org.apache.flink.api.common.eventtime.WatermarkStrategy
> > to
> > >>> create watermark generators in FLIP-126. WatermarkStrategy is a
> factory
> > >>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer
> uses
> > >>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to
> generate
> > >>> Kafka-partition-aware watermarks. As for the origin
> WatermarkProvider,
> > it
> > >>> is used to generate deprecated AssignerWithPeriodicWatermarks and
> > >>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> > >>> suitable to use the WatermarkProvider any more.
> > >>>
> > >>>
> > >>> Problems of SupportsComputedColumnPushDown
> > >>>
> > >>> There are two problems around when using
> SupportsComputedColumnPushDown
> > >>>   alone.
> > >>>
> > >>> First, planner will transform the computed column and query such as
> > >> select
> > >>> a+b to a LogicalProject. When it comes to the optimization phase, we
> > have
> > >>> no means to distinguish whether the Rexnode in the projection is from
> > >>> computed columns or query. So SupportsComputedColumnPushDown in
> reality
> > >>> will push not only the computed column but also the calculation in
> the
> > >>> query.
> > >>>
> > >>> Second, when a plan matches the rule
> > >>> PushComputedColumnIntoTableSourceScanRule, we have to build a new
> > >> RowData to
> > >>> place all fields we require. However, both two rules
> > >>> PushComputedColumnIntoTableSourceScanRule and
> > >>> PushProjectIntoTableSourceScanRule will do the same work that prune
> the
> > >>> records that read from source. It seems that we have two duplicate
> > rules
> > >> in
> > >>> planner. But I think we should use the rule
> > >>> PushProjectIntoTableSourceScanRule rather than
> > >>> PushComputedColumnIntoTableSourceScanRule if we don't support
> watermark
> > >>> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> > >>> PushProjectIntoTableSourceScanRule is much lighter and we can read
> > pruned
> > >>> data from source rather than use a map function in flink.
> > >>>
> > >>> Therefore, we think it's not a good idea to use two interfaces rather
> > >> than
> > >>> one.
> > >>>
> > >>>
> > >>> *New Proposal*
> > >>>
> > >>> First of all, let us address some background when pushing watermarks
> > into
> > >>> table source scan. There are two structures that we need to consider.
> > We
> > >>> list two examples below for discussion.
> > >>>
> > >>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
> > >> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
> > >> default_database, MyTable]])structure
> > >> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
> > >> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
> > 5000:INTERVAL
> > >> SECOND)])   +- LogicalTableScan(table=[[default_catalog,
> > default_database,
> > >> MyTable]])
> > >>>
> > >>> As we can see, structure 2 is much more complicated than structure 1.
> > For
> > >>> structure 1, we can use the row from table scan to generate
> watermarks
> > >>> directly. But for structure 2, we need to calculate the rowtime
> > >> expression
> > >>> in LogicalProject and use the result of calculation to generate
> > >>> watermarks. Considering that WatermarkStrategy has the ability to
> > extract
> > >>> timestamp from row, we have a proposal to push only WatermarkStrategy
> > to
> > >>> scan.
> > >>>
> > >>>
> > >>> Push WatermarkStrategy to Scan
> > >>>
> > >>> In this interface, we will only push WatermarkStrategy to
> > >>> DynamicTableSource.
> > >>>
> > >>> public interface SupportsWatermarkPushDown {        void
> > >> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
> > >>>
> > >>> The advantage of the new api is that it's very simple for the
> > developers
> > >>> of Connector. They only need to take WatermarkStrategy into
> > consideration
> > >>> and don't need to deal with other infos such as
> ComputedColumnConverter
> > >>> in SupportsComputedColumnPushDown. But it also has one disadvantage
> > that
> > >>> it needs to calculate the rowtime expression again in
> LogicalProjection
> > >>> because we don't build a new row in scan to store the calculated
> > >> timestamp.
> > >>> However, we can replace the calculation of rowtime in
> LogicalProjection
> > >>> with a reference to eliminate duplicate calculation, which will use
> the
> > >>> StreamRecord's getter to read the timestamp that is calculated
> before.
> > >> But
> > >>> this optimization still has one limitation that it relies on computed
> > >>> columns are not defined on other computed columns. For nested
> computed
> > >>> columns, we have no place to save the intermediate result.
> > >>>
> > >>> But we still have a problem that when we push an udf into the source,
> > we
> > >>> need a context as powerful as FunctionContext to open the udf. But
> the
> > >>> current WatermarkGeneratorSupplier.Context only supports method
> > >>> getMetricGroup and misses methods getCachedFile, getJobParameter and
> g
> > >>> etExternalResourceInfos, which means we can't convert the
> > >>> WatermarkGeneratorSupplier.Context to FunctionContext safely.
> > Considering
> > >>> that the udf is only used to generate watermark, we suggest to throw
> > >>> UnsupportedException when invoking the methods exist in
> FunctionContext
> > >>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to
> > >>> admit that there are risks in doing so because we have no promise
> that
> > >> the
> > >>> udf will not invoke these methods.
> > >>>
> > >>>
> > >>> *Summary*
> > >>>
> > >>> We have addressed the whole problem and solution in detail. As a
> > >>> conclusion, I think the new interface avoids the problems we
> mentioned
> > >>> before. It has a clear definition as its name tells and has nothing
> in
> > >>> common with SupportProjectionPushDown. As for its disadvantage, I
> think
> > >>> it's acceptable to calculate the rowtime column twice and we also
> have
> > a
> > >>> plan to improve its efficiency as a follow up if it brings some
> > >> performance
> > >>> problems.
> > >>>
> > >>>
> > >>>
> > >>
> > >
> >
> >
>

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

Posted by Jark Wu <im...@gmail.com>.
Hi Timo,

Regarding "pushing other computed columns into source, e.g. encrypted
records/columns, performing checksum checks, reading metadata etc.",
I'm not sure about this.
1. the planner don't know which computed column should be pushed into source
2. it seems that we can't improve performances if we pushdown complex logic
into source, we still need to calculate them anyway.
3. the computed column is a regular expression, if the computed column
should be pushed down, then shall we push the expressions in the following
Projection too?
    If yes, then the name of "SupportsComputedColumnPushDown" might be not
correct.
4. regarding reading metadata, according to FLIP-107, we don't use the
existing SupportsComputedColumnPushDown, but a new interface.

Therefore, I don't find a strong use case that needs this interface so far.

Best,
Jark




On Tue, 8 Sep 2020 at 17:13, Timo Walther <tw...@apache.org> wrote:

> Hi Shengkai,
>
> first of I would not consider the section Problems of
> SupportsWatermarkPushDown" as a "problem". It was planned to update the
> WatermarkProvider once the interfaces are ready. See the comment in
> WatermarkProvider:
>
> // marker interface that will be filled after FLIP-126:
> // WatermarkGenerator<RowData> getWatermarkGenerator();
>
> So far we had no sources that actually implement WatermarkStrategy.
>
> Second, for generating watermarks I don't see a problem in merging the
> two mentioned interfaces SupportsWatermarkPushDown and
> SupportsComputedColumnPushDown into one. The descibed design sounds
> reasonable to me and the impact on performance should not be too large.
>
> However, by merging these two interfaces we are also merging two
> completely separate concepts. Computed columns are not always used for
> generating a rowtime or watermark. Users can and certainly will
> implement more complex logic in there. One example could be decrypting
> encrypted records/columns, performing checksum checks, reading metadata
> etc.
>
> So in any case we should still provide two interfaces:
>
> SupportsWatermarkPushDown (functionality of computed columns + watermarks)
>
>
> SupportsComputedColumnPushDown (functionality of computed columns only)
>
> I'm fine with such a design, but it is also confusing for implementers
> that SupportsWatermarkPushDown includes the functionality of the other
> interface.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 08.09.20 04:32, Jark Wu wrote:
> > Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
> > and solutions.
> >
> > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
> much
> > cleaner and easier to develop than before.
> > So I'm +1 to the proposal.
> >
> > Best,
> > Jark
> >
> > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fs...@gmail.com> wrote:
> >
> >> Hi, all. It seems the format is not normal. So I add a google doc in
> >> link[1]. This discussion is about the interfaces in FLIP-95:  New Table
> >> Source And Table Sink and propose to merge two interfaces
> >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
> >>
> >> I am looking forward to any opinions and suggestions from the community.
> >>
> >> Regards,
> >> Shengkai
> >>
> >> [1]
> >>
> >>
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
> >>
> >> Shengkai Fang <fs...@gmail.com> 于2020年9月4日周五 下午2:58写道:
> >>
> >>>     Hi, all. Currently, we use two seperated interfaces
> >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.
> >> The
> >>> interface SupportsWatermarkPushDown relies on
> >>> SupportsComputedColumnPushDown when watermark is defined on a computed
> >>> column. During the implementation, we find the method in
> >>> SupportsWatermarkPushDown uses an out-of-date interface
> WatermarkProvider
> >>> and the duplication of SupportsComputedColumnPushDown and
> >>> SupportsProjectionPushDown. Therefore, we decide to propose a new
> >>> interface of SupportsWatermarkPushDown to solve the problems we
> >> mentioned.
> >>>
> >>>
> >>> *Problems of SupportsComputedColumnPushDown and
> >> SupportsWatermarkPushDown*Problems
> >>> of SupportsWatermarkPushDown
> >>>
> >>> SupportsWatermarkPushDown uses an inner interface named
> >> WatermarkProvider to
> >>> register WatermarkGenerator into DynamicTableSource now. However, the
> >>> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy
> to
> >>> create watermark generators in FLIP-126. WatermarkStrategy is a factory
> >>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
> >>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
> >>> Kafka-partition-aware watermarks. As for the origin WatermarkProvider,
> it
> >>> is used to generate deprecated AssignerWithPeriodicWatermarks and
> >>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> >>> suitable to use the WatermarkProvider any more.
> >>>
> >>>
> >>> Problems of SupportsComputedColumnPushDown
> >>>
> >>> There are two problems around when using SupportsComputedColumnPushDown
> >>>   alone.
> >>>
> >>> First, planner will transform the computed column and query such as
> >> select
> >>> a+b to a LogicalProject. When it comes to the optimization phase, we
> have
> >>> no means to distinguish whether the Rexnode in the projection is from
> >>> computed columns or query. So SupportsComputedColumnPushDown in reality
> >>> will push not only the computed column but also the calculation in the
> >>> query.
> >>>
> >>> Second, when a plan matches the rule
> >>> PushComputedColumnIntoTableSourceScanRule, we have to build a new
> >> RowData to
> >>> place all fields we require. However, both two rules
> >>> PushComputedColumnIntoTableSourceScanRule and
> >>> PushProjectIntoTableSourceScanRule will do the same work that prune the
> >>> records that read from source. It seems that we have two duplicate
> rules
> >> in
> >>> planner. But I think we should use the rule
> >>> PushProjectIntoTableSourceScanRule rather than
> >>> PushComputedColumnIntoTableSourceScanRule if we don't support watermark
> >>> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> >>> PushProjectIntoTableSourceScanRule is much lighter and we can read
> pruned
> >>> data from source rather than use a map function in flink.
> >>>
> >>> Therefore, we think it's not a good idea to use two interfaces rather
> >> than
> >>> one.
> >>>
> >>>
> >>> *New Proposal*
> >>>
> >>> First of all, let us address some background when pushing watermarks
> into
> >>> table source scan. There are two structures that we need to consider.
> We
> >>> list two examples below for discussion.
> >>>
> >>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
> >> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
> >> default_database, MyTable]])structure
> >> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
> >> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
> 5000:INTERVAL
> >> SECOND)])   +- LogicalTableScan(table=[[default_catalog,
> default_database,
> >> MyTable]])
> >>>
> >>> As we can see, structure 2 is much more complicated than structure 1.
> For
> >>> structure 1, we can use the row from table scan to generate watermarks
> >>> directly. But for structure 2, we need to calculate the rowtime
> >> expression
> >>> in LogicalProject and use the result of calculation to generate
> >>> watermarks. Considering that WatermarkStrategy has the ability to
> extract
> >>> timestamp from row, we have a proposal to push only WatermarkStrategy
> to
> >>> scan.
> >>>
> >>>
> >>> Push WatermarkStrategy to Scan
> >>>
> >>> In this interface, we will only push WatermarkStrategy to
> >>> DynamicTableSource.
> >>>
> >>> public interface SupportsWatermarkPushDown {        void
> >> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
> >>>
> >>> The advantage of the new api is that it's very simple for the
> developers
> >>> of Connector. They only need to take WatermarkStrategy into
> consideration
> >>> and don't need to deal with other infos such as ComputedColumnConverter
> >>> in SupportsComputedColumnPushDown. But it also has one disadvantage
> that
> >>> it needs to calculate the rowtime expression again in LogicalProjection
> >>> because we don't build a new row in scan to store the calculated
> >> timestamp.
> >>> However, we can replace the calculation of rowtime in LogicalProjection
> >>> with a reference to eliminate duplicate calculation, which will use the
> >>> StreamRecord's getter to read the timestamp that is calculated before.
> >> But
> >>> this optimization still has one limitation that it relies on computed
> >>> columns are not defined on other computed columns. For nested computed
> >>> columns, we have no place to save the intermediate result.
> >>>
> >>> But we still have a problem that when we push an udf into the source,
> we
> >>> need a context as powerful as FunctionContext to open the udf. But the
> >>> current WatermarkGeneratorSupplier.Context only supports method
> >>> getMetricGroup and misses methods getCachedFile, getJobParameter and g
> >>> etExternalResourceInfos, which means we can't convert the
> >>> WatermarkGeneratorSupplier.Context to FunctionContext safely.
> Considering
> >>> that the udf is only used to generate watermark, we suggest to throw
> >>> UnsupportedException when invoking the methods exist in FunctionContext
> >>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to
> >>> admit that there are risks in doing so because we have no promise that
> >> the
> >>> udf will not invoke these methods.
> >>>
> >>>
> >>> *Summary*
> >>>
> >>> We have addressed the whole problem and solution in detail. As a
> >>> conclusion, I think the new interface avoids the problems we mentioned
> >>> before. It has a clear definition as its name tells and has nothing in
> >>> common with SupportProjectionPushDown. As for its disadvantage, I think
> >>> it's acceptable to calculate the rowtime column twice and we also have
> a
> >>> plan to improve its efficiency as a follow up if it brings some
> >> performance
> >>> problems.
> >>>
> >>>
> >>>
> >>
> >
>
>

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

Posted by Timo Walther <tw...@apache.org>.
Hi Shengkai,

first of I would not consider the section Problems of 
SupportsWatermarkPushDown" as a "problem". It was planned to update the 
WatermarkProvider once the interfaces are ready. See the comment in 
WatermarkProvider:

// marker interface that will be filled after FLIP-126:
// WatermarkGenerator<RowData> getWatermarkGenerator();

So far we had no sources that actually implement WatermarkStrategy.

Second, for generating watermarks I don't see a problem in merging the 
two mentioned interfaces SupportsWatermarkPushDown and 
SupportsComputedColumnPushDown into one. The descibed design sounds 
reasonable to me and the impact on performance should not be too large.

However, by merging these two interfaces we are also merging two 
completely separate concepts. Computed columns are not always used for 
generating a rowtime or watermark. Users can and certainly will 
implement more complex logic in there. One example could be decrypting 
encrypted records/columns, performing checksum checks, reading metadata etc.

So in any case we should still provide two interfaces:

SupportsWatermarkPushDown (functionality of computed columns + watermarks)


SupportsComputedColumnPushDown (functionality of computed columns only)

I'm fine with such a design, but it is also confusing for implementers 
that SupportsWatermarkPushDown includes the functionality of the other 
interface.

What do you think?

Regards,
Timo


On 08.09.20 04:32, Jark Wu wrote:
> Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
> and solutions.
> 
> I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much
> cleaner and easier to develop than before.
> So I'm +1 to the proposal.
> 
> Best,
> Jark
> 
> On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fs...@gmail.com> wrote:
> 
>> Hi, all. It seems the format is not normal. So I add a google doc in
>> link[1]. This discussion is about the interfaces in FLIP-95:  New Table
>> Source And Table Sink and propose to merge two interfaces
>> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
>>
>> I am looking forward to any opinions and suggestions from the community.
>>
>> Regards,
>> Shengkai
>>
>> [1]
>>
>> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
>>
>> Shengkai Fang <fs...@gmail.com> 于2020年9月4日周五 下午2:58写道:
>>
>>>     Hi, all. Currently, we use two seperated interfaces
>>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.
>> The
>>> interface SupportsWatermarkPushDown relies on
>>> SupportsComputedColumnPushDown when watermark is defined on a computed
>>> column. During the implementation, we find the method in
>>> SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider
>>> and the duplication of SupportsComputedColumnPushDown and
>>> SupportsProjectionPushDown. Therefore, we decide to propose a new
>>> interface of SupportsWatermarkPushDown to solve the problems we
>> mentioned.
>>>
>>>
>>> *Problems of SupportsComputedColumnPushDown and
>> SupportsWatermarkPushDown*Problems
>>> of SupportsWatermarkPushDown
>>>
>>> SupportsWatermarkPushDown uses an inner interface named
>> WatermarkProvider to
>>> register WatermarkGenerator into DynamicTableSource now. However, the
>>> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
>>> create watermark generators in FLIP-126. WatermarkStrategy is a factory
>>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
>>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
>>> Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
>>> is used to generate deprecated AssignerWithPeriodicWatermarks and
>>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
>>> suitable to use the WatermarkProvider any more.
>>>
>>>
>>> Problems of SupportsComputedColumnPushDown
>>>
>>> There are two problems around when using SupportsComputedColumnPushDown
>>>   alone.
>>>
>>> First, planner will transform the computed column and query such as
>> select
>>> a+b to a LogicalProject. When it comes to the optimization phase, we have
>>> no means to distinguish whether the Rexnode in the projection is from
>>> computed columns or query. So SupportsComputedColumnPushDown in reality
>>> will push not only the computed column but also the calculation in the
>>> query.
>>>
>>> Second, when a plan matches the rule
>>> PushComputedColumnIntoTableSourceScanRule, we have to build a new
>> RowData to
>>> place all fields we require. However, both two rules
>>> PushComputedColumnIntoTableSourceScanRule and
>>> PushProjectIntoTableSourceScanRule will do the same work that prune the
>>> records that read from source. It seems that we have two duplicate rules
>> in
>>> planner. But I think we should use the rule
>>> PushProjectIntoTableSourceScanRule rather than
>>> PushComputedColumnIntoTableSourceScanRule if we don't support watermark
>>> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
>>> PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
>>> data from source rather than use a map function in flink.
>>>
>>> Therefore, we think it's not a good idea to use two interfaces rather
>> than
>>> one.
>>>
>>>
>>> *New Proposal*
>>>
>>> First of all, let us address some background when pushing watermarks into
>>> table source scan. There are two structures that we need to consider. We
>>> list two examples below for discussion.
>>>
>>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
>> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
>> default_database, MyTable]])structure
>> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
>> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL
>> SECOND)])   +- LogicalTableScan(table=[[default_catalog, default_database,
>> MyTable]])
>>>
>>> As we can see, structure 2 is much more complicated than structure 1. For
>>> structure 1, we can use the row from table scan to generate watermarks
>>> directly. But for structure 2, we need to calculate the rowtime
>> expression
>>> in LogicalProject and use the result of calculation to generate
>>> watermarks. Considering that WatermarkStrategy has the ability to extract
>>> timestamp from row, we have a proposal to push only WatermarkStrategy to
>>> scan.
>>>
>>>
>>> Push WatermarkStrategy to Scan
>>>
>>> In this interface, we will only push WatermarkStrategy to
>>> DynamicTableSource.
>>>
>>> public interface SupportsWatermarkPushDown {        void
>> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
>>>
>>> The advantage of the new api is that it's very simple for the developers
>>> of Connector. They only need to take WatermarkStrategy into consideration
>>> and don't need to deal with other infos such as ComputedColumnConverter
>>> in SupportsComputedColumnPushDown. But it also has one disadvantage that
>>> it needs to calculate the rowtime expression again in LogicalProjection
>>> because we don't build a new row in scan to store the calculated
>> timestamp.
>>> However, we can replace the calculation of rowtime in LogicalProjection
>>> with a reference to eliminate duplicate calculation, which will use the
>>> StreamRecord's getter to read the timestamp that is calculated before.
>> But
>>> this optimization still has one limitation that it relies on computed
>>> columns are not defined on other computed columns. For nested computed
>>> columns, we have no place to save the intermediate result.
>>>
>>> But we still have a problem that when we push an udf into the source, we
>>> need a context as powerful as FunctionContext to open the udf. But the
>>> current WatermarkGeneratorSupplier.Context only supports method
>>> getMetricGroup and misses methods getCachedFile, getJobParameter and g
>>> etExternalResourceInfos, which means we can't convert the
>>> WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
>>> that the udf is only used to generate watermark, we suggest to throw
>>> UnsupportedException when invoking the methods exist in FunctionContext
>>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to
>>> admit that there are risks in doing so because we have no promise that
>> the
>>> udf will not invoke these methods.
>>>
>>>
>>> *Summary*
>>>
>>> We have addressed the whole problem and solution in detail. As a
>>> conclusion, I think the new interface avoids the problems we mentioned
>>> before. It has a clear definition as its name tells and has nothing in
>>> common with SupportProjectionPushDown. As for its disadvantage, I think
>>> it's acceptable to calculate the rowtime column twice and we also have a
>>> plan to improve its efficiency as a follow up if it brings some
>> performance
>>> problems.
>>>
>>>
>>>
>>
> 


Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

Posted by Jark Wu <im...@gmail.com>.
Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
and solutions.

I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much
cleaner and easier to develop than before.
So I'm +1 to the proposal.

Best,
Jark

On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fs...@gmail.com> wrote:

> Hi, all. It seems the format is not normal. So I add a google doc in
> link[1]. This discussion is about the interfaces in FLIP-95:  New Table
> Source And Table Sink and propose to merge two interfaces
> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
>
> I am looking forward to any opinions and suggestions from the community.
>
> Regards,
> Shengkai
>
> [1]
>
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
>
> Shengkai Fang <fs...@gmail.com> 于2020年9月4日周五 下午2:58写道:
>
> >    Hi, all. Currently, we use two seperated interfaces
> > SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.
> The
> > interface SupportsWatermarkPushDown relies on
> > SupportsComputedColumnPushDown when watermark is defined on a computed
> > column. During the implementation, we find the method in
> > SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider
> > and the duplication of SupportsComputedColumnPushDown and
> > SupportsProjectionPushDown. Therefore, we decide to propose a new
> > interface of SupportsWatermarkPushDown to solve the problems we
> mentioned.
> >
> >
> > *Problems of SupportsComputedColumnPushDown and
> SupportsWatermarkPushDown*Problems
> > of SupportsWatermarkPushDown
> >
> > SupportsWatermarkPushDown uses an inner interface named
> WatermarkProvider to
> > register WatermarkGenerator into DynamicTableSource now. However, the
> > community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
> > create watermark generators in FLIP-126. WatermarkStrategy is a factory
> > of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
> > the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
> > Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
> > is used to generate deprecated AssignerWithPeriodicWatermarks and
> > PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> > suitable to use the WatermarkProvider any more.
> >
> >
> > Problems of SupportsComputedColumnPushDown
> >
> > There are two problems around when using SupportsComputedColumnPushDown
> >  alone.
> >
> > First, planner will transform the computed column and query such as
> select
> > a+b to a LogicalProject. When it comes to the optimization phase, we have
> > no means to distinguish whether the Rexnode in the projection is from
> > computed columns or query. So SupportsComputedColumnPushDown in reality
> > will push not only the computed column but also the calculation in the
> > query.
> >
> > Second, when a plan matches the rule
> > PushComputedColumnIntoTableSourceScanRule, we have to build a new
> RowData to
> > place all fields we require. However, both two rules
> > PushComputedColumnIntoTableSourceScanRule and
> > PushProjectIntoTableSourceScanRule will do the same work that prune the
> > records that read from source. It seems that we have two duplicate rules
> in
> > planner. But I think we should use the rule
> > PushProjectIntoTableSourceScanRule rather than
> > PushComputedColumnIntoTableSourceScanRule if we don't support watermark
> > push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> > PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
> > data from source rather than use a map function in flink.
> >
> > Therefore, we think it's not a good idea to use two interfaces rather
> than
> > one.
> >
> >
> > *New Proposal*
> >
> > First of all, let us address some background when pushing watermarks into
> > table source scan. There are two structures that we need to consider. We
> > list two examples below for discussion.
> >
> > structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
> default_database, MyTable]])structure
> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL
> SECOND)])   +- LogicalTableScan(table=[[default_catalog, default_database,
> MyTable]])
> >
> > As we can see, structure 2 is much more complicated than structure 1. For
> > structure 1, we can use the row from table scan to generate watermarks
> > directly. But for structure 2, we need to calculate the rowtime
> expression
> > in LogicalProject and use the result of calculation to generate
> > watermarks. Considering that WatermarkStrategy has the ability to extract
> > timestamp from row, we have a proposal to push only WatermarkStrategy to
> > scan.
> >
> >
> > Push WatermarkStrategy to Scan
> >
> > In this interface, we will only push WatermarkStrategy to
> > DynamicTableSource.
> >
> > public interface SupportsWatermarkPushDown {        void
> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
> >
> > The advantage of the new api is that it's very simple for the developers
> > of Connector. They only need to take WatermarkStrategy into consideration
> > and don't need to deal with other infos such as ComputedColumnConverter
> > in SupportsComputedColumnPushDown. But it also has one disadvantage that
> > it needs to calculate the rowtime expression again in LogicalProjection
> > because we don't build a new row in scan to store the calculated
> timestamp.
> > However, we can replace the calculation of rowtime in LogicalProjection
> > with a reference to eliminate duplicate calculation, which will use the
> > StreamRecord's getter to read the timestamp that is calculated before.
> But
> > this optimization still has one limitation that it relies on computed
> > columns are not defined on other computed columns. For nested computed
> > columns, we have no place to save the intermediate result.
> >
> > But we still have a problem that when we push an udf into the source, we
> > need a context as powerful as FunctionContext to open the udf. But the
> > current WatermarkGeneratorSupplier.Context only supports method
> > getMetricGroup and misses methods getCachedFile, getJobParameter and g
> > etExternalResourceInfos, which means we can't convert the
> > WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
> > that the udf is only used to generate watermark, we suggest to throw
> > UnsupportedException when invoking the methods exist in FunctionContext
> > but don't exist in WatermarkGeneratorSupplier.Context. But we have to
> > admit that there are risks in doing so because we have no promise that
> the
> > udf will not invoke these methods.
> >
> >
> > *Summary*
> >
> > We have addressed the whole problem and solution in detail. As a
> > conclusion, I think the new interface avoids the problems we mentioned
> > before. It has a clear definition as its name tells and has nothing in
> > common with SupportProjectionPushDown. As for its disadvantage, I think
> > it's acceptable to calculate the rowtime column twice and we also have a
> > plan to improve its efficiency as a follow up if it brings some
> performance
> > problems.
> >
> >
> >
>

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

Posted by Shengkai Fang <fs...@gmail.com>.
Hi, all. It seems the format is not normal. So I add a google doc in
link[1]. This discussion is about the interfaces in FLIP-95:  New Table
Source And Table Sink and propose to merge two interfaces
SupportsWatermarkPushDown and SupportsComputedColumnPushDown.

I am looking forward to any opinions and suggestions from the community.

Regards,
Shengkai

[1]
https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#

Shengkai Fang <fs...@gmail.com> 于2020年9月4日周五 下午2:58写道:

>    Hi, all. Currently, we use two seperated interfaces
> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The
> interface SupportsWatermarkPushDown relies on
> SupportsComputedColumnPushDown when watermark is defined on a computed
> column. During the implementation, we find the method in
> SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider
> and the duplication of SupportsComputedColumnPushDown and
> SupportsProjectionPushDown. Therefore, we decide to propose a new
> interface of SupportsWatermarkPushDown to solve the problems we mentioned.
>
>
> *Problems of SupportsComputedColumnPushDown and SupportsWatermarkPushDown*Problems
> of SupportsWatermarkPushDown
>
> SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to
> register WatermarkGenerator into DynamicTableSource now. However, the
> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
> create watermark generators in FLIP-126. WatermarkStrategy is a factory
> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
> Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
> is used to generate deprecated AssignerWithPeriodicWatermarks and
> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> suitable to use the WatermarkProvider any more.
>
>
> Problems of SupportsComputedColumnPushDown
>
> There are two problems around when using SupportsComputedColumnPushDown
>  alone.
>
> First, planner will transform the computed column and query such as select
> a+b to a LogicalProject. When it comes to the optimization phase, we have
> no means to distinguish whether the Rexnode in the projection is from
> computed columns or query. So SupportsComputedColumnPushDown in reality
> will push not only the computed column but also the calculation in the
> query.
>
> Second, when a plan matches the rule
> PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to
> place all fields we require. However, both two rules
> PushComputedColumnIntoTableSourceScanRule and
> PushProjectIntoTableSourceScanRule will do the same work that prune the
> records that read from source. It seems that we have two duplicate rules in
> planner. But I think we should use the rule
> PushProjectIntoTableSourceScanRule rather than
> PushComputedColumnIntoTableSourceScanRule if we don't support watermark
> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
> data from source rather than use a map function in flink.
>
> Therefore, we think it's not a good idea to use two interfaces rather than
> one.
>
>
> *New Proposal*
>
> First of all, let us address some background when pushing watermarks into
> table source scan. There are two structures that we need to consider. We
> list two examples below for discussion.
>
> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])structure 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL SECOND)])   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
>
> As we can see, structure 2 is much more complicated than structure 1. For
> structure 1, we can use the row from table scan to generate watermarks
> directly. But for structure 2, we need to calculate the rowtime expression
> in LogicalProject and use the result of calculation to generate
> watermarks. Considering that WatermarkStrategy has the ability to extract
> timestamp from row, we have a proposal to push only WatermarkStrategy to
> scan.
>
>
> Push WatermarkStrategy to Scan
>
> In this interface, we will only push WatermarkStrategy to
> DynamicTableSource.
>
> public interface SupportsWatermarkPushDown {        void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
>
> The advantage of the new api is that it's very simple for the developers
> of Connector. They only need to take WatermarkStrategy into consideration
> and don't need to deal with other infos such as ComputedColumnConverter
> in SupportsComputedColumnPushDown. But it also has one disadvantage that
> it needs to calculate the rowtime expression again in LogicalProjection
> because we don't build a new row in scan to store the calculated timestamp.
> However, we can replace the calculation of rowtime in LogicalProjection
> with a reference to eliminate duplicate calculation, which will use the
> StreamRecord's getter to read the timestamp that is calculated before. But
> this optimization still has one limitation that it relies on computed
> columns are not defined on other computed columns. For nested computed
> columns, we have no place to save the intermediate result.
>
> But we still have a problem that when we push an udf into the source, we
> need a context as powerful as FunctionContext to open the udf. But the
> current WatermarkGeneratorSupplier.Context only supports method
> getMetricGroup and misses methods getCachedFile, getJobParameter and g
> etExternalResourceInfos, which means we can't convert the
> WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
> that the udf is only used to generate watermark, we suggest to throw
> UnsupportedException when invoking the methods exist in FunctionContext
> but don't exist in WatermarkGeneratorSupplier.Context. But we have to
> admit that there are risks in doing so because we have no promise that the
> udf will not invoke these methods.
>
>
> *Summary*
>
> We have addressed the whole problem and solution in detail. As a
> conclusion, I think the new interface avoids the problems we mentioned
> before. It has a clear definition as its name tells and has nothing in
> common with SupportProjectionPushDown. As for its disadvantage, I think
> it's acceptable to calculate the rowtime column twice and we also have a
> plan to improve its efficiency as a follow up if it brings some performance
> problems.
>
>
>