You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by jincheng sun <su...@gmail.com> on 2019/06/26 08:36:59 UTC

[DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Hi all,

With the continuous efforts from the community, we already supported
`flatAggregate`[1] on TableAPI in retract mode. I think It's better to add
upsert mode for  `flatAggregate`.

The result table of streaming non-window `flatAggregate` is a table
contains updates. We can, of course, use a RetractStreamTableSink[2] to
emit the table, but we can get better performance in upsert mode.  However,
due to the lack of keys, we can’t use an UpsertStreamTableSink to emit the
table. We don’t have this problem for a normal aggregate as it emits a
single row for each group, so the unique keys are exactly the same with the
group keys. While for a `flatAggregate`, its pretty difference that due to
emits multi rows(a “sub-table”) for a single group. To solve this problem,
we need to find a way to define keys on flatAggregate, so that we can also
use upsert sink to emit the result table after flatAggregate.

So, Aljoscha, Hequn and I prepared a design document for how to define the
update keys for  `flatAggregate` in upsert mode.  The detail can be found
here:

https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing

I appreciate it if you can have look at the document and any comments are
welcome!


Best,

Jincheng


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Jark Wu <im...@gmail.com>.
Thanks Jincheng,

I think approach 3 there is no ambiguity in the semantics and can better
explain a deterministic result, although it is not so easy to use.
So I'm +1 to approach 3.

Best,
Jark

On Fri, 5 Jul 2019 at 18:13, jincheng sun <su...@gmail.com> wrote:

> Hi,
>
> @Kurt @Jark thanks again for your comments.
>
> @Kurt
> Separating key and non-key for UDTAGG can definitely provide more
> information for the system, however, it will also add more burden to our
> users and bring some code reuse problems. BTW, approach 3 can also be used
> to separate UDTAGG into keyed or non-keyed as we can check whether the key
> list is empty. So from this point of view, we can use approach 3 to solve
> your problem.
>
> @Jark
> It's great that the TopN in Blink can decide the key automatically. But,
> I'd like to point out another case that the keys cannot be decided by the
> system, i.e., can only be decided by the user. For example, for the TopN,
> let's say top1 for better understanding. Support the Top1 outputs three
> columns(rankid, value, seller_name), and the user wants to upsert the
> result either with key of rankid or with the key of rankid+seller_name.
> 1. With the key of rankid: In this case, the user just wants to get the
> top 1 record.
> 2. With the key of rankid+seller_name: In this case, the user wants to get
> all seller_names that have ever belong to top1. This can not be solved by
> the approach 3 if using only one function. However, it is very easy to
> implement with the withKey approach.
>
> Even though, I have thought more clearly about these things and find more
> interesting things that I want to share with you all. For the TopN example
> which I listed above, it may also lead to a problem in which batch and
> streaming are not unified.
>
> To make it worse, the upsert sink is not supported in batch and we even
> don't have any clear implementation plan about how to support upsert on the
> batch, the unification problem for `withKeys` approach becomes hang in
> doubt.
>
> So, to avoid the unification problem, I think we can also use the approach
> 3. It is more rigorous although less flexible compared to the `withKeys`
> approach.
>
> Meanwhile, I will think more about the unification problem later. Maybe
> new ideas about it may come through. :)
>
> Best,
> Jincheng
>
> Jark Wu <im...@gmail.com> 于2019年7月5日周五 上午10:48写道:
>
>> Hi Hequn,
>>
>> > If the TopN table aggregate function
>> > outputs three columns(rankid, time, value), either rankid or
>> rankid+time could be
>> > used as the key. Which one to be chosen is more likely to be decided by
>> the user
>> > according to his business.
>> In this case, the TopN table aggregate function should return two sets of
>> unique key, one is "rankid", another is "rankid, time".
>> This will be more align with current TopN node in blink planner and let
>> optimizer to decide which key based on the downstream information (column
>> selection, sink's primary key).
>>
>>
>> Best,
>> Jark
>>
>> On Fri, 5 Jul 2019 at 00:05, Hequn Cheng <ch...@gmail.com> wrote:
>>
>>> Hi Kurt and Jark,
>>>
>>> Thanks a lot for your great inputs!
>>>
>>> The keys of the query may not strongly be related to the UDTAGG.
>>> It may also be related to the corresponding scenarios that a user wants
>>> to achieve.
>>>
>>> For example, take TopN again as an example. If the TopN table aggregate
>>> function
>>> outputs three columns(rankid, time, value), either rankid or rankid+time
>>> could be
>>> used as the key. Which one to be chosen is more likely to be decided by
>>> the user
>>> according to his business.
>>>
>>> Best, Hequn
>>>
>>> On Thu, Jul 4, 2019 at 8:11 PM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> Hi jingcheng,
>>>>
>>>> I agree with Kurt's point. As you said "the user must know the keys of
>>>> the output of UDTAGG clearly".
>>>> If I understand correctly, the key information is strongly relative to
>>>> the UDTAGG implementation.
>>>> Users may call `flatAggregate` on a UDTAGG instance with different keys
>>>> which may result in a wrong result.
>>>> So I think it would be better to couple key information with UDTAGG
>>>> interface (i.e. "Approach 3" in your design doc).
>>>>
>>>> Regards,
>>>> Jark
>>>>
>>>> On Thu, 4 Jul 2019 at 18:06, Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> Hi Jincheng,
>>>>>
>>>>> Thanks for the clarification. I think you just pointed out my concern
>>>>> by
>>>>> yourself:
>>>>>
>>>>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>>>>> must understand the behavior of the UDTAGG, including the return type
>>>>> and
>>>>> the characteristics of the returned data. such as the key fields.
>>>>>
>>>>> This indicates that the UDTAGG is somehow be classified to different
>>>>> types,
>>>>> one will no key, one with key information. So the developer of the
>>>>> UDTAGG
>>>>> should choose which type of this function should be. In this case,
>>>>> my question would be, why don't we have explicit information about keys
>>>>> such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the
>>>>> user
>>>>> and the framework will have a better understanding of
>>>>> this UDTAGG. `withKeys` solution is letting user to choose the key and
>>>>> it
>>>>> seems it will only work correctly only if the user choose the *right*
>>>>> key
>>>>> this UDTAGG has.
>>>>>
>>>>> Let me know if this makes sense to you.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi All,
>>>>> >
>>>>> > @Kurt Young <yk...@gmail.com> one user-defined table aggregate
>>>>> function
>>>>> > can be used in both with(out) keys case, and we do not introduce any
>>>>> other
>>>>> > aggregations. just like the explanation from @Hequn.
>>>>> >
>>>>> > @Hequn Cheng <ch...@gmail.com> thanks for your explanation!
>>>>> >
>>>>> > One thing should be mentioned here:
>>>>> >
>>>>> > When a user uses a User-defined table aggregate function (UDTAGG),
>>>>> he must
>>>>> > understand the behavior of the UDTAGG, including the return type and
>>>>> the
>>>>> > characteristics of the returned data. such as the key fields.  So
>>>>> although
>>>>> > using `withKeys` approach it is not rigorous enough(we do not need)
>>>>> but
>>>>> > intuitive enough, considering that if `flatAggregate` is followed by
>>>>> an
>>>>> > `upsertSink`, then the user must know the keys of the output of
>>>>> UDTAGG
>>>>> > clearly, otherwise the keys of `upsertSink` cannot be defined. So I
>>>>> still
>>>>> > prefer the `withKeys` solution by now.
>>>>> >
>>>>> > Looking forward to any feedback from all of you!
>>>>> >
>>>>> > Best,
>>>>> > Jincheng
>>>>> >
>>>>> >
>>>>> >
>>>>> > Hequn Cheng <ch...@gmail.com> 于2019年7月1日周一 下午5:35写道:
>>>>> >
>>>>> >> Hi Kurt,
>>>>> >>
>>>>> >> Thanks for your questions. Here are my thoughts.
>>>>> >>
>>>>> >> > if I want to write such kind function, should I make sure that
>>>>> this
>>>>> >> function is used with some keys?
>>>>> >> The key information may not be used. We can also use RetractSink to
>>>>> emit
>>>>> >> the table directly.
>>>>> >>
>>>>> >> >  If I need a use case to calculate topn without key, should I
>>>>> write
>>>>> >> another function or I can reuse previous one.
>>>>> >> For the TopN example, you can reuse the previous function if you
>>>>> don't
>>>>> >> care
>>>>> >> about the key information.
>>>>> >>
>>>>> >> So, the key information is only an indicator(or a description), not
>>>>> an
>>>>> >> operator, as Jincheng mentioned above.
>>>>> >> We do not need to change the function logic and it will not add any
>>>>> other
>>>>> >> aggregations.
>>>>> >>
>>>>> >> BTW, we have three approaches in the document. Approach 1 defines
>>>>> keys on
>>>>> >> API level as we think it's more common to define keys on Table.
>>>>> >> While approach 3 defines keys in the TableAggregateFunction which
>>>>> is more
>>>>> >> precise but it is not very clear for Table users. So, we should
>>>>> take all
>>>>> >> these into consideration, and make the decision in this discussion
>>>>> thread.
>>>>> >>
>>>>> >> You can take a look at the document and welcome any suggestions or
>>>>> other
>>>>> >> better solutions.
>>>>> >>
>>>>> >> Best, Hequn
>>>>> >>
>>>>> >>
>>>>> >> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> > Hi Jincheng,
>>>>> >> >
>>>>> >> > Thanks for the clarification. Take 'TopN' for example, if I want
>>>>> to
>>>>> >> write
>>>>> >> > such kind function,
>>>>> >> > should I make sure that this function is used with some keys? If
>>>>> I need
>>>>> >> a
>>>>> >> > use case to calculate
>>>>> >> > topn without key, should I write another function or I can reuse
>>>>> >> previous
>>>>> >> > one.
>>>>> >> >
>>>>> >> > I'm not sure about the idea of this does not involve semantic
>>>>> changes.
>>>>> >> To
>>>>> >> > me, it sounds like
>>>>> >> > we are doing another nested aggregation inside the table
>>>>> >> > which TableAggregateFunction emits.
>>>>> >> >
>>>>> >> > Maybe I'm not familiar with this function enough, hope you can
>>>>> help me
>>>>> >> to
>>>>> >> > understand.
>>>>> >> >
>>>>> >> > Best,
>>>>> >> > Kurt
>>>>> >> >
>>>>> >> >
>>>>> >> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <
>>>>> sunjincheng121@gmail.com>
>>>>> >> > wrote:
>>>>> >> >
>>>>> >> > > Hi Kurt,
>>>>> >> > >
>>>>> >> > > Thanks for your questions, I am glad to share my thoughts here:
>>>>> >> > >
>>>>> >> > > My question is, will that effect the logic
>>>>> ofTableAggregateFunction
>>>>> >> user
>>>>> >> > > > wrote? Should the user know that there will a key and make
>>>>> some
>>>>> >> changes
>>>>> >> > > to
>>>>> >> > > > this function?
>>>>> >> > >
>>>>> >> > >
>>>>> >> > > No, the keys information depends on the implementation of the
>>>>> >> > > TableAggregateFunction.
>>>>> >> > > For example, for a `topN` user defined TableAggregateFunction,
>>>>> we can
>>>>> >> > only
>>>>> >> > > use the `keys` if the `topN` contains `rankid` in the output.
>>>>> You can
>>>>> >> > > treat the
>>>>> >> > > `keys` like an indicator.
>>>>> >> > >
>>>>> >> > > If not, how will framework deal with the output of user's
>>>>> >> > > > TableAggregateFunction.  if user output multiple rows with
>>>>> the same
>>>>> >> > key,
>>>>> >> > > > should be latter one replace previous ones?
>>>>> >> > >
>>>>> >> > >
>>>>> >> > > If a TableAggregateFunction outputs multiple rows with the same
>>>>> key,
>>>>> >> the
>>>>> >> > > latter one should replace the previous one, either with upsert
>>>>> mode or
>>>>> >> > > retract mode. i.e., Whether the user defines the Key or not,
>>>>> the Flink
>>>>> >> > > framework should ensure the correctness of the semantics.
>>>>> >> > >
>>>>> >> > > At present, the problem we are discussing does not involve
>>>>> semantic
>>>>> >> > > changes. The definition of keys is to support non-window
>>>>> >> flatAggregate on
>>>>> >> > > upsert mode. (The upsert mode is already supported in the flink
>>>>> >> > framework.
>>>>> >> > > The current discussion only needs to inform the framework that
>>>>> the
>>>>> >> keys
>>>>> >> > > information, which is the `withKeys` API we discussing.)
>>>>> >> > >
>>>>> >> > > Welcome any other feedbacks :)
>>>>> >> > >
>>>>> >> > > Best,
>>>>> >> > > Jincheng
>>>>> >> > >
>>>>> >> > > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
>>>>> >> > >
>>>>> >> > > > Hi,
>>>>> >> > > >
>>>>> >> > > > I have a question about the key information of
>>>>> >> TableAggregateFunction.
>>>>> >> > > > IIUC, you need to define
>>>>> >> > > > something like primary key or unique key in the result table
>>>>> of
>>>>> >> > > > TableAggregateFunction, and also
>>>>> >> > > > need a way to let user configure this through the API. My
>>>>> question
>>>>> >> is,
>>>>> >> > > will
>>>>> >> > > > that effect the logic of
>>>>> >> > > > TableAggregateFunction user wrote? Should the user know that
>>>>> there
>>>>> >> > will a
>>>>> >> > > > key and make some changes
>>>>> >> > > > to this function?
>>>>> >> > > >
>>>>> >> > > > If so, what's the semantic the user should learned. If not,
>>>>> how will
>>>>> >> > > > framework deal with the output of user's
>>>>> >> > > > TableAggregateFunction. For example, if user output multiple
>>>>> rows
>>>>> >> with
>>>>> >> > > the
>>>>> >> > > > same key, should be latter one
>>>>> >> > > > replace previous ones?
>>>>> >> > > >
>>>>> >> > > > Best,
>>>>> >> > > > Kurt
>>>>> >> > > >
>>>>> >> > > >
>>>>> >> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
>>>>> >> sunjincheng121@gmail.com>
>>>>> >> > > > wrote:
>>>>> >> > > >
>>>>> >> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution
>>>>> is our
>>>>> >> > > better
>>>>> >> > > > > choice!
>>>>> >> > > > >
>>>>> >> > > > >
>>>>> >> > > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
>>>>> >> > > > >
>>>>> >> > > > > > Hi Jincheng,
>>>>> >> > > > > >
>>>>> >> > > > > > Thanks for raising the discussion!
>>>>> >> > > > > > The key information is very important for query
>>>>> optimizations.
>>>>> >> It
>>>>> >> > > would
>>>>> >> > > > > be
>>>>> >> > > > > > nice if we can use upsert mode to achieve better
>>>>> performance.
>>>>> >> > > > > >
>>>>> >> > > > > > +1 for the `withKeys` proposal. :)
>>>>> >> > > > > >
>>>>> >> > > > > > Best, Hequn
>>>>> >> > > > > >
>>>>> >> > > > > >
>>>>> >> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
>>>>> >> > > sunjincheng121@gmail.com
>>>>> >> > > > >
>>>>> >> > > > > > wrote:
>>>>> >> > > > > >
>>>>> >> > > > > > > Hi all,
>>>>> >> > > > > > >
>>>>> >> > > > > > > With the continuous efforts from the community, we
>>>>> already
>>>>> >> > > supported
>>>>> >> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think
>>>>> It's
>>>>> >> > better
>>>>> >> > > > to
>>>>> >> > > > > > add
>>>>> >> > > > > > > upsert mode for  `flatAggregate`.
>>>>> >> > > > > > >
>>>>> >> > > > > > > The result table of streaming non-window
>>>>> `flatAggregate` is a
>>>>> >> > table
>>>>> >> > > > > > > contains updates. We can, of course, use a
>>>>> >> > > RetractStreamTableSink[2]
>>>>> >> > > > to
>>>>> >> > > > > > > emit the table, but we can get better performance in
>>>>> upsert
>>>>> >> mode.
>>>>> >> > > > > > However,
>>>>> >> > > > > > > due to the lack of keys, we can’t use an
>>>>> >> UpsertStreamTableSink to
>>>>> >> > > > emit
>>>>> >> > > > > > the
>>>>> >> > > > > > > table. We don’t have this problem for a normal
>>>>> aggregate as it
>>>>> >> > > emits
>>>>> >> > > > a
>>>>> >> > > > > > > single row for each group, so the unique keys are
>>>>> exactly the
>>>>> >> > same
>>>>> >> > > > with
>>>>> >> > > > > > the
>>>>> >> > > > > > > group keys. While for a `flatAggregate`, its pretty
>>>>> difference
>>>>> >> > that
>>>>> >> > > > due
>>>>> >> > > > > > to
>>>>> >> > > > > > > emits multi rows(a “sub-table”) for a single group. To
>>>>> solve
>>>>> >> this
>>>>> >> > > > > > problem,
>>>>> >> > > > > > > we need to find a way to define keys on flatAggregate,
>>>>> so
>>>>> >> that we
>>>>> >> > > can
>>>>> >> > > > > > also
>>>>> >> > > > > > > use upsert sink to emit the result table after
>>>>> flatAggregate.
>>>>> >> > > > > > >
>>>>> >> > > > > > > So, Aljoscha, Hequn and I prepared a design document
>>>>> for how
>>>>> >> to
>>>>> >> > > > define
>>>>> >> > > > > > the
>>>>> >> > > > > > > update keys for  `flatAggregate` in upsert mode.  The
>>>>> detail
>>>>> >> can
>>>>> >> > be
>>>>> >> > > > > found
>>>>> >> > > > > > > here:
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
>>>>> >> > > > > > >
>>>>> >> > > > > > > I appreciate it if you can have look at the document
>>>>> and any
>>>>> >> > > comments
>>>>> >> > > > > are
>>>>> >> > > > > > > welcome!
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > > > Best,
>>>>> >> > > > > > >
>>>>> >> > > > > > > Jincheng
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > > > [1]
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
>>>>> >> > > > > > >
>>>>> >> > > > > > > [2]
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> >
>>>>>
>>>>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by jincheng sun <su...@gmail.com>.
Hi,

@Kurt @Jark thanks again for your comments.

@Kurt
Separating key and non-key for UDTAGG can definitely provide more
information for the system, however, it will also add more burden to our
users and bring some code reuse problems. BTW, approach 3 can also be used
to separate UDTAGG into keyed or non-keyed as we can check whether the key
list is empty. So from this point of view, we can use approach 3 to solve
your problem.

@Jark
It's great that the TopN in Blink can decide the key automatically. But,
I'd like to point out another case that the keys cannot be decided by the
system, i.e., can only be decided by the user. For example, for the TopN,
let's say top1 for better understanding. Support the Top1 outputs three
columns(rankid, value, seller_name), and the user wants to upsert the
result either with key of rankid or with the key of rankid+seller_name.
1. With the key of rankid: In this case, the user just wants to get the top
1 record.
2. With the key of rankid+seller_name: In this case, the user wants to get
all seller_names that have ever belong to top1. This can not be solved by
the approach 3 if using only one function. However, it is very easy to
implement with the withKey approach.

Even though, I have thought more clearly about these things and find more
interesting things that I want to share with you all. For the TopN example
which I listed above, it may also lead to a problem in which batch and
streaming are not unified.

To make it worse, the upsert sink is not supported in batch and we even
don't have any clear implementation plan about how to support upsert on the
batch, the unification problem for `withKeys` approach becomes hang in
doubt.

So, to avoid the unification problem, I think we can also use the approach
3. It is more rigorous although less flexible compared to the `withKeys`
approach.

Meanwhile, I will think more about the unification problem later. Maybe new
ideas about it may come through. :)

Best,
Jincheng

Jark Wu <im...@gmail.com> 于2019年7月5日周五 上午10:48写道:

> Hi Hequn,
>
> > If the TopN table aggregate function
> > outputs three columns(rankid, time, value), either rankid or rankid+time
> could be
> > used as the key. Which one to be chosen is more likely to be decided by
> the user
> > according to his business.
> In this case, the TopN table aggregate function should return two sets of
> unique key, one is "rankid", another is "rankid, time".
> This will be more align with current TopN node in blink planner and let
> optimizer to decide which key based on the downstream information (column
> selection, sink's primary key).
>
>
> Best,
> Jark
>
> On Fri, 5 Jul 2019 at 00:05, Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi Kurt and Jark,
>>
>> Thanks a lot for your great inputs!
>>
>> The keys of the query may not strongly be related to the UDTAGG.
>> It may also be related to the corresponding scenarios that a user wants
>> to achieve.
>>
>> For example, take TopN again as an example. If the TopN table aggregate
>> function
>> outputs three columns(rankid, time, value), either rankid or rankid+time
>> could be
>> used as the key. Which one to be chosen is more likely to be decided by
>> the user
>> according to his business.
>>
>> Best, Hequn
>>
>> On Thu, Jul 4, 2019 at 8:11 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi jingcheng,
>>>
>>> I agree with Kurt's point. As you said "the user must know the keys of
>>> the output of UDTAGG clearly".
>>> If I understand correctly, the key information is strongly relative to
>>> the UDTAGG implementation.
>>> Users may call `flatAggregate` on a UDTAGG instance with different keys
>>> which may result in a wrong result.
>>> So I think it would be better to couple key information with UDTAGG
>>> interface (i.e. "Approach 3" in your design doc).
>>>
>>> Regards,
>>> Jark
>>>
>>> On Thu, 4 Jul 2019 at 18:06, Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> Hi Jincheng,
>>>>
>>>> Thanks for the clarification. I think you just pointed out my concern by
>>>> yourself:
>>>>
>>>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>>>> must understand the behavior of the UDTAGG, including the return type
>>>> and
>>>> the characteristics of the returned data. such as the key fields.
>>>>
>>>> This indicates that the UDTAGG is somehow be classified to different
>>>> types,
>>>> one will no key, one with key information. So the developer of the
>>>> UDTAGG
>>>> should choose which type of this function should be. In this case,
>>>> my question would be, why don't we have explicit information about keys
>>>> such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the
>>>> user
>>>> and the framework will have a better understanding of
>>>> this UDTAGG. `withKeys` solution is letting user to choose the key and
>>>> it
>>>> seems it will only work correctly only if the user choose the *right*
>>>> key
>>>> this UDTAGG has.
>>>>
>>>> Let me know if this makes sense to you.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <su...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi All,
>>>> >
>>>> > @Kurt Young <yk...@gmail.com> one user-defined table aggregate
>>>> function
>>>> > can be used in both with(out) keys case, and we do not introduce any
>>>> other
>>>> > aggregations. just like the explanation from @Hequn.
>>>> >
>>>> > @Hequn Cheng <ch...@gmail.com> thanks for your explanation!
>>>> >
>>>> > One thing should be mentioned here:
>>>> >
>>>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>>>> must
>>>> > understand the behavior of the UDTAGG, including the return type and
>>>> the
>>>> > characteristics of the returned data. such as the key fields.  So
>>>> although
>>>> > using `withKeys` approach it is not rigorous enough(we do not need)
>>>> but
>>>> > intuitive enough, considering that if `flatAggregate` is followed by
>>>> an
>>>> > `upsertSink`, then the user must know the keys of the output of UDTAGG
>>>> > clearly, otherwise the keys of `upsertSink` cannot be defined. So I
>>>> still
>>>> > prefer the `withKeys` solution by now.
>>>> >
>>>> > Looking forward to any feedback from all of you!
>>>> >
>>>> > Best,
>>>> > Jincheng
>>>> >
>>>> >
>>>> >
>>>> > Hequn Cheng <ch...@gmail.com> 于2019年7月1日周一 下午5:35写道:
>>>> >
>>>> >> Hi Kurt,
>>>> >>
>>>> >> Thanks for your questions. Here are my thoughts.
>>>> >>
>>>> >> > if I want to write such kind function, should I make sure that this
>>>> >> function is used with some keys?
>>>> >> The key information may not be used. We can also use RetractSink to
>>>> emit
>>>> >> the table directly.
>>>> >>
>>>> >> >  If I need a use case to calculate topn without key, should I write
>>>> >> another function or I can reuse previous one.
>>>> >> For the TopN example, you can reuse the previous function if you
>>>> don't
>>>> >> care
>>>> >> about the key information.
>>>> >>
>>>> >> So, the key information is only an indicator(or a description), not
>>>> an
>>>> >> operator, as Jincheng mentioned above.
>>>> >> We do not need to change the function logic and it will not add any
>>>> other
>>>> >> aggregations.
>>>> >>
>>>> >> BTW, we have three approaches in the document. Approach 1 defines
>>>> keys on
>>>> >> API level as we think it's more common to define keys on Table.
>>>> >> While approach 3 defines keys in the TableAggregateFunction which is
>>>> more
>>>> >> precise but it is not very clear for Table users. So, we should take
>>>> all
>>>> >> these into consideration, and make the decision in this discussion
>>>> thread.
>>>> >>
>>>> >> You can take a look at the document and welcome any suggestions or
>>>> other
>>>> >> better solutions.
>>>> >>
>>>> >> Best, Hequn
>>>> >>
>>>> >>
>>>> >> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com> wrote:
>>>> >>
>>>> >> > Hi Jincheng,
>>>> >> >
>>>> >> > Thanks for the clarification. Take 'TopN' for example, if I want to
>>>> >> write
>>>> >> > such kind function,
>>>> >> > should I make sure that this function is used with some keys? If I
>>>> need
>>>> >> a
>>>> >> > use case to calculate
>>>> >> > topn without key, should I write another function or I can reuse
>>>> >> previous
>>>> >> > one.
>>>> >> >
>>>> >> > I'm not sure about the idea of this does not involve semantic
>>>> changes.
>>>> >> To
>>>> >> > me, it sounds like
>>>> >> > we are doing another nested aggregation inside the table
>>>> >> > which TableAggregateFunction emits.
>>>> >> >
>>>> >> > Maybe I'm not familiar with this function enough, hope you can
>>>> help me
>>>> >> to
>>>> >> > understand.
>>>> >> >
>>>> >> > Best,
>>>> >> > Kurt
>>>> >> >
>>>> >> >
>>>> >> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <
>>>> sunjincheng121@gmail.com>
>>>> >> > wrote:
>>>> >> >
>>>> >> > > Hi Kurt,
>>>> >> > >
>>>> >> > > Thanks for your questions, I am glad to share my thoughts here:
>>>> >> > >
>>>> >> > > My question is, will that effect the logic
>>>> ofTableAggregateFunction
>>>> >> user
>>>> >> > > > wrote? Should the user know that there will a key and make some
>>>> >> changes
>>>> >> > > to
>>>> >> > > > this function?
>>>> >> > >
>>>> >> > >
>>>> >> > > No, the keys information depends on the implementation of the
>>>> >> > > TableAggregateFunction.
>>>> >> > > For example, for a `topN` user defined TableAggregateFunction,
>>>> we can
>>>> >> > only
>>>> >> > > use the `keys` if the `topN` contains `rankid` in the output.
>>>> You can
>>>> >> > > treat the
>>>> >> > > `keys` like an indicator.
>>>> >> > >
>>>> >> > > If not, how will framework deal with the output of user's
>>>> >> > > > TableAggregateFunction.  if user output multiple rows with the
>>>> same
>>>> >> > key,
>>>> >> > > > should be latter one replace previous ones?
>>>> >> > >
>>>> >> > >
>>>> >> > > If a TableAggregateFunction outputs multiple rows with the same
>>>> key,
>>>> >> the
>>>> >> > > latter one should replace the previous one, either with upsert
>>>> mode or
>>>> >> > > retract mode. i.e., Whether the user defines the Key or not, the
>>>> Flink
>>>> >> > > framework should ensure the correctness of the semantics.
>>>> >> > >
>>>> >> > > At present, the problem we are discussing does not involve
>>>> semantic
>>>> >> > > changes. The definition of keys is to support non-window
>>>> >> flatAggregate on
>>>> >> > > upsert mode. (The upsert mode is already supported in the flink
>>>> >> > framework.
>>>> >> > > The current discussion only needs to inform the framework that
>>>> the
>>>> >> keys
>>>> >> > > information, which is the `withKeys` API we discussing.)
>>>> >> > >
>>>> >> > > Welcome any other feedbacks :)
>>>> >> > >
>>>> >> > > Best,
>>>> >> > > Jincheng
>>>> >> > >
>>>> >> > > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
>>>> >> > >
>>>> >> > > > Hi,
>>>> >> > > >
>>>> >> > > > I have a question about the key information of
>>>> >> TableAggregateFunction.
>>>> >> > > > IIUC, you need to define
>>>> >> > > > something like primary key or unique key in the result table of
>>>> >> > > > TableAggregateFunction, and also
>>>> >> > > > need a way to let user configure this through the API. My
>>>> question
>>>> >> is,
>>>> >> > > will
>>>> >> > > > that effect the logic of
>>>> >> > > > TableAggregateFunction user wrote? Should the user know that
>>>> there
>>>> >> > will a
>>>> >> > > > key and make some changes
>>>> >> > > > to this function?
>>>> >> > > >
>>>> >> > > > If so, what's the semantic the user should learned. If not,
>>>> how will
>>>> >> > > > framework deal with the output of user's
>>>> >> > > > TableAggregateFunction. For example, if user output multiple
>>>> rows
>>>> >> with
>>>> >> > > the
>>>> >> > > > same key, should be latter one
>>>> >> > > > replace previous ones?
>>>> >> > > >
>>>> >> > > > Best,
>>>> >> > > > Kurt
>>>> >> > > >
>>>> >> > > >
>>>> >> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
>>>> >> sunjincheng121@gmail.com>
>>>> >> > > > wrote:
>>>> >> > > >
>>>> >> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution
>>>> is our
>>>> >> > > better
>>>> >> > > > > choice!
>>>> >> > > > >
>>>> >> > > > >
>>>> >> > > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
>>>> >> > > > >
>>>> >> > > > > > Hi Jincheng,
>>>> >> > > > > >
>>>> >> > > > > > Thanks for raising the discussion!
>>>> >> > > > > > The key information is very important for query
>>>> optimizations.
>>>> >> It
>>>> >> > > would
>>>> >> > > > > be
>>>> >> > > > > > nice if we can use upsert mode to achieve better
>>>> performance.
>>>> >> > > > > >
>>>> >> > > > > > +1 for the `withKeys` proposal. :)
>>>> >> > > > > >
>>>> >> > > > > > Best, Hequn
>>>> >> > > > > >
>>>> >> > > > > >
>>>> >> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
>>>> >> > > sunjincheng121@gmail.com
>>>> >> > > > >
>>>> >> > > > > > wrote:
>>>> >> > > > > >
>>>> >> > > > > > > Hi all,
>>>> >> > > > > > >
>>>> >> > > > > > > With the continuous efforts from the community, we
>>>> already
>>>> >> > > supported
>>>> >> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think
>>>> It's
>>>> >> > better
>>>> >> > > > to
>>>> >> > > > > > add
>>>> >> > > > > > > upsert mode for  `flatAggregate`.
>>>> >> > > > > > >
>>>> >> > > > > > > The result table of streaming non-window `flatAggregate`
>>>> is a
>>>> >> > table
>>>> >> > > > > > > contains updates. We can, of course, use a
>>>> >> > > RetractStreamTableSink[2]
>>>> >> > > > to
>>>> >> > > > > > > emit the table, but we can get better performance in
>>>> upsert
>>>> >> mode.
>>>> >> > > > > > However,
>>>> >> > > > > > > due to the lack of keys, we can’t use an
>>>> >> UpsertStreamTableSink to
>>>> >> > > > emit
>>>> >> > > > > > the
>>>> >> > > > > > > table. We don’t have this problem for a normal aggregate
>>>> as it
>>>> >> > > emits
>>>> >> > > > a
>>>> >> > > > > > > single row for each group, so the unique keys are
>>>> exactly the
>>>> >> > same
>>>> >> > > > with
>>>> >> > > > > > the
>>>> >> > > > > > > group keys. While for a `flatAggregate`, its pretty
>>>> difference
>>>> >> > that
>>>> >> > > > due
>>>> >> > > > > > to
>>>> >> > > > > > > emits multi rows(a “sub-table”) for a single group. To
>>>> solve
>>>> >> this
>>>> >> > > > > > problem,
>>>> >> > > > > > > we need to find a way to define keys on flatAggregate, so
>>>> >> that we
>>>> >> > > can
>>>> >> > > > > > also
>>>> >> > > > > > > use upsert sink to emit the result table after
>>>> flatAggregate.
>>>> >> > > > > > >
>>>> >> > > > > > > So, Aljoscha, Hequn and I prepared a design document for
>>>> how
>>>> >> to
>>>> >> > > > define
>>>> >> > > > > > the
>>>> >> > > > > > > update keys for  `flatAggregate` in upsert mode.  The
>>>> detail
>>>> >> can
>>>> >> > be
>>>> >> > > > > found
>>>> >> > > > > > > here:
>>>> >> > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
>>>> >> > > > > > >
>>>> >> > > > > > > I appreciate it if you can have look at the document and
>>>> any
>>>> >> > > comments
>>>> >> > > > > are
>>>> >> > > > > > > welcome!
>>>> >> > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > > > Best,
>>>> >> > > > > > >
>>>> >> > > > > > > Jincheng
>>>> >> > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > > > [1]
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
>>>> >> > > > > > >
>>>> >> > > > > > > [2]
>>>> >> > > > > > >
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
>>>> >> > > > > > >
>>>> >> > > > > >
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> >
>>>>
>>>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Jark Wu <im...@gmail.com>.
Hi Hequn,

> If the TopN table aggregate function
> outputs three columns(rankid, time, value), either rankid or rankid+time
could be
> used as the key. Which one to be chosen is more likely to be decided by
the user
> according to his business.
In this case, the TopN table aggregate function should return two sets of
unique key, one is "rankid", another is "rankid, time".
This will be more align with current TopN node in blink planner and let
optimizer to decide which key based on the downstream information (column
selection, sink's primary key).


Best,
Jark

On Fri, 5 Jul 2019 at 00:05, Hequn Cheng <ch...@gmail.com> wrote:

> Hi Kurt and Jark,
>
> Thanks a lot for your great inputs!
>
> The keys of the query may not strongly be related to the UDTAGG.
> It may also be related to the corresponding scenarios that a user wants to
> achieve.
>
> For example, take TopN again as an example. If the TopN table aggregate
> function
> outputs three columns(rankid, time, value), either rankid or rankid+time
> could be
> used as the key. Which one to be chosen is more likely to be decided by
> the user
> according to his business.
>
> Best, Hequn
>
> On Thu, Jul 4, 2019 at 8:11 PM Jark Wu <im...@gmail.com> wrote:
>
>> Hi jingcheng,
>>
>> I agree with Kurt's point. As you said "the user must know the keys of
>> the output of UDTAGG clearly".
>> If I understand correctly, the key information is strongly relative to
>> the UDTAGG implementation.
>> Users may call `flatAggregate` on a UDTAGG instance with different keys
>> which may result in a wrong result.
>> So I think it would be better to couple key information with UDTAGG
>> interface (i.e. "Approach 3" in your design doc).
>>
>> Regards,
>> Jark
>>
>> On Thu, 4 Jul 2019 at 18:06, Kurt Young <yk...@gmail.com> wrote:
>>
>>> Hi Jincheng,
>>>
>>> Thanks for the clarification. I think you just pointed out my concern by
>>> yourself:
>>>
>>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>>> must understand the behavior of the UDTAGG, including the return type and
>>> the characteristics of the returned data. such as the key fields.
>>>
>>> This indicates that the UDTAGG is somehow be classified to different
>>> types,
>>> one will no key, one with key information. So the developer of the UDTAGG
>>> should choose which type of this function should be. In this case,
>>> my question would be, why don't we have explicit information about keys
>>> such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the user
>>> and the framework will have a better understanding of
>>> this UDTAGG. `withKeys` solution is letting user to choose the key and it
>>> seems it will only work correctly only if the user choose the *right* key
>>> this UDTAGG has.
>>>
>>> Let me know if this makes sense to you.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <su...@gmail.com>
>>> wrote:
>>>
>>> > Hi All,
>>> >
>>> > @Kurt Young <yk...@gmail.com> one user-defined table aggregate
>>> function
>>> > can be used in both with(out) keys case, and we do not introduce any
>>> other
>>> > aggregations. just like the explanation from @Hequn.
>>> >
>>> > @Hequn Cheng <ch...@gmail.com> thanks for your explanation!
>>> >
>>> > One thing should be mentioned here:
>>> >
>>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>>> must
>>> > understand the behavior of the UDTAGG, including the return type and
>>> the
>>> > characteristics of the returned data. such as the key fields.  So
>>> although
>>> > using `withKeys` approach it is not rigorous enough(we do not need) but
>>> > intuitive enough, considering that if `flatAggregate` is followed by an
>>> > `upsertSink`, then the user must know the keys of the output of UDTAGG
>>> > clearly, otherwise the keys of `upsertSink` cannot be defined. So I
>>> still
>>> > prefer the `withKeys` solution by now.
>>> >
>>> > Looking forward to any feedback from all of you!
>>> >
>>> > Best,
>>> > Jincheng
>>> >
>>> >
>>> >
>>> > Hequn Cheng <ch...@gmail.com> 于2019年7月1日周一 下午5:35写道:
>>> >
>>> >> Hi Kurt,
>>> >>
>>> >> Thanks for your questions. Here are my thoughts.
>>> >>
>>> >> > if I want to write such kind function, should I make sure that this
>>> >> function is used with some keys?
>>> >> The key information may not be used. We can also use RetractSink to
>>> emit
>>> >> the table directly.
>>> >>
>>> >> >  If I need a use case to calculate topn without key, should I write
>>> >> another function or I can reuse previous one.
>>> >> For the TopN example, you can reuse the previous function if you don't
>>> >> care
>>> >> about the key information.
>>> >>
>>> >> So, the key information is only an indicator(or a description), not an
>>> >> operator, as Jincheng mentioned above.
>>> >> We do not need to change the function logic and it will not add any
>>> other
>>> >> aggregations.
>>> >>
>>> >> BTW, we have three approaches in the document. Approach 1 defines
>>> keys on
>>> >> API level as we think it's more common to define keys on Table.
>>> >> While approach 3 defines keys in the TableAggregateFunction which is
>>> more
>>> >> precise but it is not very clear for Table users. So, we should take
>>> all
>>> >> these into consideration, and make the decision in this discussion
>>> thread.
>>> >>
>>> >> You can take a look at the document and welcome any suggestions or
>>> other
>>> >> better solutions.
>>> >>
>>> >> Best, Hequn
>>> >>
>>> >>
>>> >> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com> wrote:
>>> >>
>>> >> > Hi Jincheng,
>>> >> >
>>> >> > Thanks for the clarification. Take 'TopN' for example, if I want to
>>> >> write
>>> >> > such kind function,
>>> >> > should I make sure that this function is used with some keys? If I
>>> need
>>> >> a
>>> >> > use case to calculate
>>> >> > topn without key, should I write another function or I can reuse
>>> >> previous
>>> >> > one.
>>> >> >
>>> >> > I'm not sure about the idea of this does not involve semantic
>>> changes.
>>> >> To
>>> >> > me, it sounds like
>>> >> > we are doing another nested aggregation inside the table
>>> >> > which TableAggregateFunction emits.
>>> >> >
>>> >> > Maybe I'm not familiar with this function enough, hope you can help
>>> me
>>> >> to
>>> >> > understand.
>>> >> >
>>> >> > Best,
>>> >> > Kurt
>>> >> >
>>> >> >
>>> >> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <
>>> sunjincheng121@gmail.com>
>>> >> > wrote:
>>> >> >
>>> >> > > Hi Kurt,
>>> >> > >
>>> >> > > Thanks for your questions, I am glad to share my thoughts here:
>>> >> > >
>>> >> > > My question is, will that effect the logic
>>> ofTableAggregateFunction
>>> >> user
>>> >> > > > wrote? Should the user know that there will a key and make some
>>> >> changes
>>> >> > > to
>>> >> > > > this function?
>>> >> > >
>>> >> > >
>>> >> > > No, the keys information depends on the implementation of the
>>> >> > > TableAggregateFunction.
>>> >> > > For example, for a `topN` user defined TableAggregateFunction, we
>>> can
>>> >> > only
>>> >> > > use the `keys` if the `topN` contains `rankid` in the output. You
>>> can
>>> >> > > treat the
>>> >> > > `keys` like an indicator.
>>> >> > >
>>> >> > > If not, how will framework deal with the output of user's
>>> >> > > > TableAggregateFunction.  if user output multiple rows with the
>>> same
>>> >> > key,
>>> >> > > > should be latter one replace previous ones?
>>> >> > >
>>> >> > >
>>> >> > > If a TableAggregateFunction outputs multiple rows with the same
>>> key,
>>> >> the
>>> >> > > latter one should replace the previous one, either with upsert
>>> mode or
>>> >> > > retract mode. i.e., Whether the user defines the Key or not, the
>>> Flink
>>> >> > > framework should ensure the correctness of the semantics.
>>> >> > >
>>> >> > > At present, the problem we are discussing does not involve
>>> semantic
>>> >> > > changes. The definition of keys is to support non-window
>>> >> flatAggregate on
>>> >> > > upsert mode. (The upsert mode is already supported in the flink
>>> >> > framework.
>>> >> > > The current discussion only needs to inform the framework that the
>>> >> keys
>>> >> > > information, which is the `withKeys` API we discussing.)
>>> >> > >
>>> >> > > Welcome any other feedbacks :)
>>> >> > >
>>> >> > > Best,
>>> >> > > Jincheng
>>> >> > >
>>> >> > > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
>>> >> > >
>>> >> > > > Hi,
>>> >> > > >
>>> >> > > > I have a question about the key information of
>>> >> TableAggregateFunction.
>>> >> > > > IIUC, you need to define
>>> >> > > > something like primary key or unique key in the result table of
>>> >> > > > TableAggregateFunction, and also
>>> >> > > > need a way to let user configure this through the API. My
>>> question
>>> >> is,
>>> >> > > will
>>> >> > > > that effect the logic of
>>> >> > > > TableAggregateFunction user wrote? Should the user know that
>>> there
>>> >> > will a
>>> >> > > > key and make some changes
>>> >> > > > to this function?
>>> >> > > >
>>> >> > > > If so, what's the semantic the user should learned. If not, how
>>> will
>>> >> > > > framework deal with the output of user's
>>> >> > > > TableAggregateFunction. For example, if user output multiple
>>> rows
>>> >> with
>>> >> > > the
>>> >> > > > same key, should be latter one
>>> >> > > > replace previous ones?
>>> >> > > >
>>> >> > > > Best,
>>> >> > > > Kurt
>>> >> > > >
>>> >> > > >
>>> >> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
>>> >> sunjincheng121@gmail.com>
>>> >> > > > wrote:
>>> >> > > >
>>> >> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution
>>> is our
>>> >> > > better
>>> >> > > > > choice!
>>> >> > > > >
>>> >> > > > >
>>> >> > > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
>>> >> > > > >
>>> >> > > > > > Hi Jincheng,
>>> >> > > > > >
>>> >> > > > > > Thanks for raising the discussion!
>>> >> > > > > > The key information is very important for query
>>> optimizations.
>>> >> It
>>> >> > > would
>>> >> > > > > be
>>> >> > > > > > nice if we can use upsert mode to achieve better
>>> performance.
>>> >> > > > > >
>>> >> > > > > > +1 for the `withKeys` proposal. :)
>>> >> > > > > >
>>> >> > > > > > Best, Hequn
>>> >> > > > > >
>>> >> > > > > >
>>> >> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
>>> >> > > sunjincheng121@gmail.com
>>> >> > > > >
>>> >> > > > > > wrote:
>>> >> > > > > >
>>> >> > > > > > > Hi all,
>>> >> > > > > > >
>>> >> > > > > > > With the continuous efforts from the community, we already
>>> >> > > supported
>>> >> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think
>>> It's
>>> >> > better
>>> >> > > > to
>>> >> > > > > > add
>>> >> > > > > > > upsert mode for  `flatAggregate`.
>>> >> > > > > > >
>>> >> > > > > > > The result table of streaming non-window `flatAggregate`
>>> is a
>>> >> > table
>>> >> > > > > > > contains updates. We can, of course, use a
>>> >> > > RetractStreamTableSink[2]
>>> >> > > > to
>>> >> > > > > > > emit the table, but we can get better performance in
>>> upsert
>>> >> mode.
>>> >> > > > > > However,
>>> >> > > > > > > due to the lack of keys, we can’t use an
>>> >> UpsertStreamTableSink to
>>> >> > > > emit
>>> >> > > > > > the
>>> >> > > > > > > table. We don’t have this problem for a normal aggregate
>>> as it
>>> >> > > emits
>>> >> > > > a
>>> >> > > > > > > single row for each group, so the unique keys are exactly
>>> the
>>> >> > same
>>> >> > > > with
>>> >> > > > > > the
>>> >> > > > > > > group keys. While for a `flatAggregate`, its pretty
>>> difference
>>> >> > that
>>> >> > > > due
>>> >> > > > > > to
>>> >> > > > > > > emits multi rows(a “sub-table”) for a single group. To
>>> solve
>>> >> this
>>> >> > > > > > problem,
>>> >> > > > > > > we need to find a way to define keys on flatAggregate, so
>>> >> that we
>>> >> > > can
>>> >> > > > > > also
>>> >> > > > > > > use upsert sink to emit the result table after
>>> flatAggregate.
>>> >> > > > > > >
>>> >> > > > > > > So, Aljoscha, Hequn and I prepared a design document for
>>> how
>>> >> to
>>> >> > > > define
>>> >> > > > > > the
>>> >> > > > > > > update keys for  `flatAggregate` in upsert mode.  The
>>> detail
>>> >> can
>>> >> > be
>>> >> > > > > found
>>> >> > > > > > > here:
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
>>> >> > > > > > >
>>> >> > > > > > > I appreciate it if you can have look at the document and
>>> any
>>> >> > > comments
>>> >> > > > > are
>>> >> > > > > > > welcome!
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > > > Best,
>>> >> > > > > > >
>>> >> > > > > > > Jincheng
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > > > [1]
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
>>> >> > > > > > >
>>> >> > > > > > > [2]
>>> >> > > > > > >
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
>>> >> > > > > > >
>>> >> > > > > >
>>> >> > > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> >
>>>
>>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Kurt and Jark,

Thanks a lot for your great inputs!

The keys of the query may not strongly be related to the UDTAGG.
It may also be related to the corresponding scenarios that a user wants to
achieve.

For example, take TopN again as an example. If the TopN table aggregate
function
outputs three columns(rankid, time, value), either rankid or rankid+time
could be
used as the key. Which one to be chosen is more likely to be decided by the
user
according to his business.

Best, Hequn

On Thu, Jul 4, 2019 at 8:11 PM Jark Wu <im...@gmail.com> wrote:

> Hi jingcheng,
>
> I agree with Kurt's point. As you said "the user must know the keys of the
> output of UDTAGG clearly".
> If I understand correctly, the key information is strongly relative to the
> UDTAGG implementation.
> Users may call `flatAggregate` on a UDTAGG instance with different keys
> which may result in a wrong result.
> So I think it would be better to couple key information with UDTAGG
> interface (i.e. "Approach 3" in your design doc).
>
> Regards,
> Jark
>
> On Thu, 4 Jul 2019 at 18:06, Kurt Young <yk...@gmail.com> wrote:
>
>> Hi Jincheng,
>>
>> Thanks for the clarification. I think you just pointed out my concern by
>> yourself:
>>
>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>> must understand the behavior of the UDTAGG, including the return type and
>> the characteristics of the returned data. such as the key fields.
>>
>> This indicates that the UDTAGG is somehow be classified to different
>> types,
>> one will no key, one with key information. So the developer of the UDTAGG
>> should choose which type of this function should be. In this case,
>> my question would be, why don't we have explicit information about keys
>> such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the user
>> and the framework will have a better understanding of
>> this UDTAGG. `withKeys` solution is letting user to choose the key and it
>> seems it will only work correctly only if the user choose the *right* key
>> this UDTAGG has.
>>
>> Let me know if this makes sense to you.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <su...@gmail.com>
>> wrote:
>>
>> > Hi All,
>> >
>> > @Kurt Young <yk...@gmail.com> one user-defined table aggregate
>> function
>> > can be used in both with(out) keys case, and we do not introduce any
>> other
>> > aggregations. just like the explanation from @Hequn.
>> >
>> > @Hequn Cheng <ch...@gmail.com> thanks for your explanation!
>> >
>> > One thing should be mentioned here:
>> >
>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>> must
>> > understand the behavior of the UDTAGG, including the return type and the
>> > characteristics of the returned data. such as the key fields.  So
>> although
>> > using `withKeys` approach it is not rigorous enough(we do not need) but
>> > intuitive enough, considering that if `flatAggregate` is followed by an
>> > `upsertSink`, then the user must know the keys of the output of UDTAGG
>> > clearly, otherwise the keys of `upsertSink` cannot be defined. So I
>> still
>> > prefer the `withKeys` solution by now.
>> >
>> > Looking forward to any feedback from all of you!
>> >
>> > Best,
>> > Jincheng
>> >
>> >
>> >
>> > Hequn Cheng <ch...@gmail.com> 于2019年7月1日周一 下午5:35写道:
>> >
>> >> Hi Kurt,
>> >>
>> >> Thanks for your questions. Here are my thoughts.
>> >>
>> >> > if I want to write such kind function, should I make sure that this
>> >> function is used with some keys?
>> >> The key information may not be used. We can also use RetractSink to
>> emit
>> >> the table directly.
>> >>
>> >> >  If I need a use case to calculate topn without key, should I write
>> >> another function or I can reuse previous one.
>> >> For the TopN example, you can reuse the previous function if you don't
>> >> care
>> >> about the key information.
>> >>
>> >> So, the key information is only an indicator(or a description), not an
>> >> operator, as Jincheng mentioned above.
>> >> We do not need to change the function logic and it will not add any
>> other
>> >> aggregations.
>> >>
>> >> BTW, we have three approaches in the document. Approach 1 defines keys
>> on
>> >> API level as we think it's more common to define keys on Table.
>> >> While approach 3 defines keys in the TableAggregateFunction which is
>> more
>> >> precise but it is not very clear for Table users. So, we should take
>> all
>> >> these into consideration, and make the decision in this discussion
>> thread.
>> >>
>> >> You can take a look at the document and welcome any suggestions or
>> other
>> >> better solutions.
>> >>
>> >> Best, Hequn
>> >>
>> >>
>> >> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com> wrote:
>> >>
>> >> > Hi Jincheng,
>> >> >
>> >> > Thanks for the clarification. Take 'TopN' for example, if I want to
>> >> write
>> >> > such kind function,
>> >> > should I make sure that this function is used with some keys? If I
>> need
>> >> a
>> >> > use case to calculate
>> >> > topn without key, should I write another function or I can reuse
>> >> previous
>> >> > one.
>> >> >
>> >> > I'm not sure about the idea of this does not involve semantic
>> changes.
>> >> To
>> >> > me, it sounds like
>> >> > we are doing another nested aggregation inside the table
>> >> > which TableAggregateFunction emits.
>> >> >
>> >> > Maybe I'm not familiar with this function enough, hope you can help
>> me
>> >> to
>> >> > understand.
>> >> >
>> >> > Best,
>> >> > Kurt
>> >> >
>> >> >
>> >> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <
>> sunjincheng121@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi Kurt,
>> >> > >
>> >> > > Thanks for your questions, I am glad to share my thoughts here:
>> >> > >
>> >> > > My question is, will that effect the logic ofTableAggregateFunction
>> >> user
>> >> > > > wrote? Should the user know that there will a key and make some
>> >> changes
>> >> > > to
>> >> > > > this function?
>> >> > >
>> >> > >
>> >> > > No, the keys information depends on the implementation of the
>> >> > > TableAggregateFunction.
>> >> > > For example, for a `topN` user defined TableAggregateFunction, we
>> can
>> >> > only
>> >> > > use the `keys` if the `topN` contains `rankid` in the output. You
>> can
>> >> > > treat the
>> >> > > `keys` like an indicator.
>> >> > >
>> >> > > If not, how will framework deal with the output of user's
>> >> > > > TableAggregateFunction.  if user output multiple rows with the
>> same
>> >> > key,
>> >> > > > should be latter one replace previous ones?
>> >> > >
>> >> > >
>> >> > > If a TableAggregateFunction outputs multiple rows with the same
>> key,
>> >> the
>> >> > > latter one should replace the previous one, either with upsert
>> mode or
>> >> > > retract mode. i.e., Whether the user defines the Key or not, the
>> Flink
>> >> > > framework should ensure the correctness of the semantics.
>> >> > >
>> >> > > At present, the problem we are discussing does not involve semantic
>> >> > > changes. The definition of keys is to support non-window
>> >> flatAggregate on
>> >> > > upsert mode. (The upsert mode is already supported in the flink
>> >> > framework.
>> >> > > The current discussion only needs to inform the framework that the
>> >> keys
>> >> > > information, which is the `withKeys` API we discussing.)
>> >> > >
>> >> > > Welcome any other feedbacks :)
>> >> > >
>> >> > > Best,
>> >> > > Jincheng
>> >> > >
>> >> > > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
>> >> > >
>> >> > > > Hi,
>> >> > > >
>> >> > > > I have a question about the key information of
>> >> TableAggregateFunction.
>> >> > > > IIUC, you need to define
>> >> > > > something like primary key or unique key in the result table of
>> >> > > > TableAggregateFunction, and also
>> >> > > > need a way to let user configure this through the API. My
>> question
>> >> is,
>> >> > > will
>> >> > > > that effect the logic of
>> >> > > > TableAggregateFunction user wrote? Should the user know that
>> there
>> >> > will a
>> >> > > > key and make some changes
>> >> > > > to this function?
>> >> > > >
>> >> > > > If so, what's the semantic the user should learned. If not, how
>> will
>> >> > > > framework deal with the output of user's
>> >> > > > TableAggregateFunction. For example, if user output multiple rows
>> >> with
>> >> > > the
>> >> > > > same key, should be latter one
>> >> > > > replace previous ones?
>> >> > > >
>> >> > > > Best,
>> >> > > > Kurt
>> >> > > >
>> >> > > >
>> >> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
>> >> sunjincheng121@gmail.com>
>> >> > > > wrote:
>> >> > > >
>> >> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution is
>> our
>> >> > > better
>> >> > > > > choice!
>> >> > > > >
>> >> > > > >
>> >> > > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
>> >> > > > >
>> >> > > > > > Hi Jincheng,
>> >> > > > > >
>> >> > > > > > Thanks for raising the discussion!
>> >> > > > > > The key information is very important for query
>> optimizations.
>> >> It
>> >> > > would
>> >> > > > > be
>> >> > > > > > nice if we can use upsert mode to achieve better performance.
>> >> > > > > >
>> >> > > > > > +1 for the `withKeys` proposal. :)
>> >> > > > > >
>> >> > > > > > Best, Hequn
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
>> >> > > sunjincheng121@gmail.com
>> >> > > > >
>> >> > > > > > wrote:
>> >> > > > > >
>> >> > > > > > > Hi all,
>> >> > > > > > >
>> >> > > > > > > With the continuous efforts from the community, we already
>> >> > > supported
>> >> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think
>> It's
>> >> > better
>> >> > > > to
>> >> > > > > > add
>> >> > > > > > > upsert mode for  `flatAggregate`.
>> >> > > > > > >
>> >> > > > > > > The result table of streaming non-window `flatAggregate`
>> is a
>> >> > table
>> >> > > > > > > contains updates. We can, of course, use a
>> >> > > RetractStreamTableSink[2]
>> >> > > > to
>> >> > > > > > > emit the table, but we can get better performance in upsert
>> >> mode.
>> >> > > > > > However,
>> >> > > > > > > due to the lack of keys, we can’t use an
>> >> UpsertStreamTableSink to
>> >> > > > emit
>> >> > > > > > the
>> >> > > > > > > table. We don’t have this problem for a normal aggregate
>> as it
>> >> > > emits
>> >> > > > a
>> >> > > > > > > single row for each group, so the unique keys are exactly
>> the
>> >> > same
>> >> > > > with
>> >> > > > > > the
>> >> > > > > > > group keys. While for a `flatAggregate`, its pretty
>> difference
>> >> > that
>> >> > > > due
>> >> > > > > > to
>> >> > > > > > > emits multi rows(a “sub-table”) for a single group. To
>> solve
>> >> this
>> >> > > > > > problem,
>> >> > > > > > > we need to find a way to define keys on flatAggregate, so
>> >> that we
>> >> > > can
>> >> > > > > > also
>> >> > > > > > > use upsert sink to emit the result table after
>> flatAggregate.
>> >> > > > > > >
>> >> > > > > > > So, Aljoscha, Hequn and I prepared a design document for
>> how
>> >> to
>> >> > > > define
>> >> > > > > > the
>> >> > > > > > > update keys for  `flatAggregate` in upsert mode.  The
>> detail
>> >> can
>> >> > be
>> >> > > > > found
>> >> > > > > > > here:
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
>> >> > > > > > >
>> >> > > > > > > I appreciate it if you can have look at the document and
>> any
>> >> > > comments
>> >> > > > > are
>> >> > > > > > > welcome!
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > > > Best,
>> >> > > > > > >
>> >> > > > > > > Jincheng
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > > > [1]
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
>> >> > > > > > >
>> >> > > > > > > [2]
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>>
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Jark Wu <im...@gmail.com>.
Hi jingcheng,

I agree with Kurt's point. As you said "the user must know the keys of the
output of UDTAGG clearly".
If I understand correctly, the key information is strongly relative to the
UDTAGG implementation.
Users may call `flatAggregate` on a UDTAGG instance with different keys
which may result in a wrong result.
So I think it would be better to couple key information with UDTAGG
interface (i.e. "Approach 3" in your design doc).

Regards,
Jark

On Thu, 4 Jul 2019 at 18:06, Kurt Young <yk...@gmail.com> wrote:

> Hi Jincheng,
>
> Thanks for the clarification. I think you just pointed out my concern by
> yourself:
>
> > When a user uses a User-defined table aggregate function (UDTAGG), he
> must understand the behavior of the UDTAGG, including the return type and
> the characteristics of the returned data. such as the key fields.
>
> This indicates that the UDTAGG is somehow be classified to different types,
> one will no key, one with key information. So the developer of the UDTAGG
> should choose which type of this function should be. In this case,
> my question would be, why don't we have explicit information about keys
> such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the user
> and the framework will have a better understanding of
> this UDTAGG. `withKeys` solution is letting user to choose the key and it
> seems it will only work correctly only if the user choose the *right* key
> this UDTAGG has.
>
> Let me know if this makes sense to you.
>
> Best,
> Kurt
>
>
> On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <su...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > @Kurt Young <yk...@gmail.com> one user-defined table aggregate function
> > can be used in both with(out) keys case, and we do not introduce any
> other
> > aggregations. just like the explanation from @Hequn.
> >
> > @Hequn Cheng <ch...@gmail.com> thanks for your explanation!
> >
> > One thing should be mentioned here:
> >
> > When a user uses a User-defined table aggregate function (UDTAGG), he
> must
> > understand the behavior of the UDTAGG, including the return type and the
> > characteristics of the returned data. such as the key fields.  So
> although
> > using `withKeys` approach it is not rigorous enough(we do not need) but
> > intuitive enough, considering that if `flatAggregate` is followed by an
> > `upsertSink`, then the user must know the keys of the output of UDTAGG
> > clearly, otherwise the keys of `upsertSink` cannot be defined. So I still
> > prefer the `withKeys` solution by now.
> >
> > Looking forward to any feedback from all of you!
> >
> > Best,
> > Jincheng
> >
> >
> >
> > Hequn Cheng <ch...@gmail.com> 于2019年7月1日周一 下午5:35写道:
> >
> >> Hi Kurt,
> >>
> >> Thanks for your questions. Here are my thoughts.
> >>
> >> > if I want to write such kind function, should I make sure that this
> >> function is used with some keys?
> >> The key information may not be used. We can also use RetractSink to emit
> >> the table directly.
> >>
> >> >  If I need a use case to calculate topn without key, should I write
> >> another function or I can reuse previous one.
> >> For the TopN example, you can reuse the previous function if you don't
> >> care
> >> about the key information.
> >>
> >> So, the key information is only an indicator(or a description), not an
> >> operator, as Jincheng mentioned above.
> >> We do not need to change the function logic and it will not add any
> other
> >> aggregations.
> >>
> >> BTW, we have three approaches in the document. Approach 1 defines keys
> on
> >> API level as we think it's more common to define keys on Table.
> >> While approach 3 defines keys in the TableAggregateFunction which is
> more
> >> precise but it is not very clear for Table users. So, we should take all
> >> these into consideration, and make the decision in this discussion
> thread.
> >>
> >> You can take a look at the document and welcome any suggestions or other
> >> better solutions.
> >>
> >> Best, Hequn
> >>
> >>
> >> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com> wrote:
> >>
> >> > Hi Jincheng,
> >> >
> >> > Thanks for the clarification. Take 'TopN' for example, if I want to
> >> write
> >> > such kind function,
> >> > should I make sure that this function is used with some keys? If I
> need
> >> a
> >> > use case to calculate
> >> > topn without key, should I write another function or I can reuse
> >> previous
> >> > one.
> >> >
> >> > I'm not sure about the idea of this does not involve semantic changes.
> >> To
> >> > me, it sounds like
> >> > we are doing another nested aggregation inside the table
> >> > which TableAggregateFunction emits.
> >> >
> >> > Maybe I'm not familiar with this function enough, hope you can help me
> >> to
> >> > understand.
> >> >
> >> > Best,
> >> > Kurt
> >> >
> >> >
> >> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <
> sunjincheng121@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Kurt,
> >> > >
> >> > > Thanks for your questions, I am glad to share my thoughts here:
> >> > >
> >> > > My question is, will that effect the logic ofTableAggregateFunction
> >> user
> >> > > > wrote? Should the user know that there will a key and make some
> >> changes
> >> > > to
> >> > > > this function?
> >> > >
> >> > >
> >> > > No, the keys information depends on the implementation of the
> >> > > TableAggregateFunction.
> >> > > For example, for a `topN` user defined TableAggregateFunction, we
> can
> >> > only
> >> > > use the `keys` if the `topN` contains `rankid` in the output. You
> can
> >> > > treat the
> >> > > `keys` like an indicator.
> >> > >
> >> > > If not, how will framework deal with the output of user's
> >> > > > TableAggregateFunction.  if user output multiple rows with the
> same
> >> > key,
> >> > > > should be latter one replace previous ones?
> >> > >
> >> > >
> >> > > If a TableAggregateFunction outputs multiple rows with the same key,
> >> the
> >> > > latter one should replace the previous one, either with upsert mode
> or
> >> > > retract mode. i.e., Whether the user defines the Key or not, the
> Flink
> >> > > framework should ensure the correctness of the semantics.
> >> > >
> >> > > At present, the problem we are discussing does not involve semantic
> >> > > changes. The definition of keys is to support non-window
> >> flatAggregate on
> >> > > upsert mode. (The upsert mode is already supported in the flink
> >> > framework.
> >> > > The current discussion only needs to inform the framework that the
> >> keys
> >> > > information, which is the `withKeys` API we discussing.)
> >> > >
> >> > > Welcome any other feedbacks :)
> >> > >
> >> > > Best,
> >> > > Jincheng
> >> > >
> >> > > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I have a question about the key information of
> >> TableAggregateFunction.
> >> > > > IIUC, you need to define
> >> > > > something like primary key or unique key in the result table of
> >> > > > TableAggregateFunction, and also
> >> > > > need a way to let user configure this through the API. My question
> >> is,
> >> > > will
> >> > > > that effect the logic of
> >> > > > TableAggregateFunction user wrote? Should the user know that there
> >> > will a
> >> > > > key and make some changes
> >> > > > to this function?
> >> > > >
> >> > > > If so, what's the semantic the user should learned. If not, how
> will
> >> > > > framework deal with the output of user's
> >> > > > TableAggregateFunction. For example, if user output multiple rows
> >> with
> >> > > the
> >> > > > same key, should be latter one
> >> > > > replace previous ones?
> >> > > >
> >> > > > Best,
> >> > > > Kurt
> >> > > >
> >> > > >
> >> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
> >> sunjincheng121@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution is
> our
> >> > > better
> >> > > > > choice!
> >> > > > >
> >> > > > >
> >> > > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
> >> > > > >
> >> > > > > > Hi Jincheng,
> >> > > > > >
> >> > > > > > Thanks for raising the discussion!
> >> > > > > > The key information is very important for query optimizations.
> >> It
> >> > > would
> >> > > > > be
> >> > > > > > nice if we can use upsert mode to achieve better performance.
> >> > > > > >
> >> > > > > > +1 for the `withKeys` proposal. :)
> >> > > > > >
> >> > > > > > Best, Hequn
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
> >> > > sunjincheng121@gmail.com
> >> > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi all,
> >> > > > > > >
> >> > > > > > > With the continuous efforts from the community, we already
> >> > > supported
> >> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think It's
> >> > better
> >> > > > to
> >> > > > > > add
> >> > > > > > > upsert mode for  `flatAggregate`.
> >> > > > > > >
> >> > > > > > > The result table of streaming non-window `flatAggregate` is
> a
> >> > table
> >> > > > > > > contains updates. We can, of course, use a
> >> > > RetractStreamTableSink[2]
> >> > > > to
> >> > > > > > > emit the table, but we can get better performance in upsert
> >> mode.
> >> > > > > > However,
> >> > > > > > > due to the lack of keys, we can’t use an
> >> UpsertStreamTableSink to
> >> > > > emit
> >> > > > > > the
> >> > > > > > > table. We don’t have this problem for a normal aggregate as
> it
> >> > > emits
> >> > > > a
> >> > > > > > > single row for each group, so the unique keys are exactly
> the
> >> > same
> >> > > > with
> >> > > > > > the
> >> > > > > > > group keys. While for a `flatAggregate`, its pretty
> difference
> >> > that
> >> > > > due
> >> > > > > > to
> >> > > > > > > emits multi rows(a “sub-table”) for a single group. To solve
> >> this
> >> > > > > > problem,
> >> > > > > > > we need to find a way to define keys on flatAggregate, so
> >> that we
> >> > > can
> >> > > > > > also
> >> > > > > > > use upsert sink to emit the result table after
> flatAggregate.
> >> > > > > > >
> >> > > > > > > So, Aljoscha, Hequn and I prepared a design document for how
> >> to
> >> > > > define
> >> > > > > > the
> >> > > > > > > update keys for  `flatAggregate` in upsert mode.  The detail
> >> can
> >> > be
> >> > > > > found
> >> > > > > > > here:
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
> >> > > > > > >
> >> > > > > > > I appreciate it if you can have look at the document and any
> >> > > comments
> >> > > > > are
> >> > > > > > > welcome!
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > >
> >> > > > > > > Jincheng
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > [1]
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
> >> > > > > > >
> >> > > > > > > [2]
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Kurt Young <yk...@gmail.com>.
Hi Jincheng,

Thanks for the clarification. I think you just pointed out my concern by
yourself:

> When a user uses a User-defined table aggregate function (UDTAGG), he
must understand the behavior of the UDTAGG, including the return type and
the characteristics of the returned data. such as the key fields.

This indicates that the UDTAGG is somehow be classified to different types,
one will no key, one with key information. So the developer of the UDTAGG
should choose which type of this function should be. In this case,
my question would be, why don't we have explicit information about keys
such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the user
and the framework will have a better understanding of
this UDTAGG. `withKeys` solution is letting user to choose the key and it
seems it will only work correctly only if the user choose the *right* key
this UDTAGG has.

Let me know if this makes sense to you.

Best,
Kurt


On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <su...@gmail.com>
wrote:

> Hi All,
>
> @Kurt Young <yk...@gmail.com> one user-defined table aggregate function
> can be used in both with(out) keys case, and we do not introduce any other
> aggregations. just like the explanation from @Hequn.
>
> @Hequn Cheng <ch...@gmail.com> thanks for your explanation!
>
> One thing should be mentioned here:
>
> When a user uses a User-defined table aggregate function (UDTAGG), he must
> understand the behavior of the UDTAGG, including the return type and the
> characteristics of the returned data. such as the key fields.  So although
> using `withKeys` approach it is not rigorous enough(we do not need) but
> intuitive enough, considering that if `flatAggregate` is followed by an
> `upsertSink`, then the user must know the keys of the output of UDTAGG
> clearly, otherwise the keys of `upsertSink` cannot be defined. So I still
> prefer the `withKeys` solution by now.
>
> Looking forward to any feedback from all of you!
>
> Best,
> Jincheng
>
>
>
> Hequn Cheng <ch...@gmail.com> 于2019年7月1日周一 下午5:35写道:
>
>> Hi Kurt,
>>
>> Thanks for your questions. Here are my thoughts.
>>
>> > if I want to write such kind function, should I make sure that this
>> function is used with some keys?
>> The key information may not be used. We can also use RetractSink to emit
>> the table directly.
>>
>> >  If I need a use case to calculate topn without key, should I write
>> another function or I can reuse previous one.
>> For the TopN example, you can reuse the previous function if you don't
>> care
>> about the key information.
>>
>> So, the key information is only an indicator(or a description), not an
>> operator, as Jincheng mentioned above.
>> We do not need to change the function logic and it will not add any other
>> aggregations.
>>
>> BTW, we have three approaches in the document. Approach 1 defines keys on
>> API level as we think it's more common to define keys on Table.
>> While approach 3 defines keys in the TableAggregateFunction which is more
>> precise but it is not very clear for Table users. So, we should take all
>> these into consideration, and make the decision in this discussion thread.
>>
>> You can take a look at the document and welcome any suggestions or other
>> better solutions.
>>
>> Best, Hequn
>>
>>
>> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com> wrote:
>>
>> > Hi Jincheng,
>> >
>> > Thanks for the clarification. Take 'TopN' for example, if I want to
>> write
>> > such kind function,
>> > should I make sure that this function is used with some keys? If I need
>> a
>> > use case to calculate
>> > topn without key, should I write another function or I can reuse
>> previous
>> > one.
>> >
>> > I'm not sure about the idea of this does not involve semantic changes.
>> To
>> > me, it sounds like
>> > we are doing another nested aggregation inside the table
>> > which TableAggregateFunction emits.
>> >
>> > Maybe I'm not familiar with this function enough, hope you can help me
>> to
>> > understand.
>> >
>> > Best,
>> > Kurt
>> >
>> >
>> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <su...@gmail.com>
>> > wrote:
>> >
>> > > Hi Kurt,
>> > >
>> > > Thanks for your questions, I am glad to share my thoughts here:
>> > >
>> > > My question is, will that effect the logic ofTableAggregateFunction
>> user
>> > > > wrote? Should the user know that there will a key and make some
>> changes
>> > > to
>> > > > this function?
>> > >
>> > >
>> > > No, the keys information depends on the implementation of the
>> > > TableAggregateFunction.
>> > > For example, for a `topN` user defined TableAggregateFunction, we can
>> > only
>> > > use the `keys` if the `topN` contains `rankid` in the output. You can
>> > > treat the
>> > > `keys` like an indicator.
>> > >
>> > > If not, how will framework deal with the output of user's
>> > > > TableAggregateFunction.  if user output multiple rows with the same
>> > key,
>> > > > should be latter one replace previous ones?
>> > >
>> > >
>> > > If a TableAggregateFunction outputs multiple rows with the same key,
>> the
>> > > latter one should replace the previous one, either with upsert mode or
>> > > retract mode. i.e., Whether the user defines the Key or not, the Flink
>> > > framework should ensure the correctness of the semantics.
>> > >
>> > > At present, the problem we are discussing does not involve semantic
>> > > changes. The definition of keys is to support non-window
>> flatAggregate on
>> > > upsert mode. (The upsert mode is already supported in the flink
>> > framework.
>> > > The current discussion only needs to inform the framework that the
>> keys
>> > > information, which is the `withKeys` API we discussing.)
>> > >
>> > > Welcome any other feedbacks :)
>> > >
>> > > Best,
>> > > Jincheng
>> > >
>> > > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
>> > >
>> > > > Hi,
>> > > >
>> > > > I have a question about the key information of
>> TableAggregateFunction.
>> > > > IIUC, you need to define
>> > > > something like primary key or unique key in the result table of
>> > > > TableAggregateFunction, and also
>> > > > need a way to let user configure this through the API. My question
>> is,
>> > > will
>> > > > that effect the logic of
>> > > > TableAggregateFunction user wrote? Should the user know that there
>> > will a
>> > > > key and make some changes
>> > > > to this function?
>> > > >
>> > > > If so, what's the semantic the user should learned. If not, how will
>> > > > framework deal with the output of user's
>> > > > TableAggregateFunction. For example, if user output multiple rows
>> with
>> > > the
>> > > > same key, should be latter one
>> > > > replace previous ones?
>> > > >
>> > > > Best,
>> > > > Kurt
>> > > >
>> > > >
>> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
>> sunjincheng121@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution is our
>> > > better
>> > > > > choice!
>> > > > >
>> > > > >
>> > > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
>> > > > >
>> > > > > > Hi Jincheng,
>> > > > > >
>> > > > > > Thanks for raising the discussion!
>> > > > > > The key information is very important for query optimizations.
>> It
>> > > would
>> > > > > be
>> > > > > > nice if we can use upsert mode to achieve better performance.
>> > > > > >
>> > > > > > +1 for the `withKeys` proposal. :)
>> > > > > >
>> > > > > > Best, Hequn
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
>> > > sunjincheng121@gmail.com
>> > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi all,
>> > > > > > >
>> > > > > > > With the continuous efforts from the community, we already
>> > > supported
>> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think It's
>> > better
>> > > > to
>> > > > > > add
>> > > > > > > upsert mode for  `flatAggregate`.
>> > > > > > >
>> > > > > > > The result table of streaming non-window `flatAggregate` is a
>> > table
>> > > > > > > contains updates. We can, of course, use a
>> > > RetractStreamTableSink[2]
>> > > > to
>> > > > > > > emit the table, but we can get better performance in upsert
>> mode.
>> > > > > > However,
>> > > > > > > due to the lack of keys, we can’t use an
>> UpsertStreamTableSink to
>> > > > emit
>> > > > > > the
>> > > > > > > table. We don’t have this problem for a normal aggregate as it
>> > > emits
>> > > > a
>> > > > > > > single row for each group, so the unique keys are exactly the
>> > same
>> > > > with
>> > > > > > the
>> > > > > > > group keys. While for a `flatAggregate`, its pretty difference
>> > that
>> > > > due
>> > > > > > to
>> > > > > > > emits multi rows(a “sub-table”) for a single group. To solve
>> this
>> > > > > > problem,
>> > > > > > > we need to find a way to define keys on flatAggregate, so
>> that we
>> > > can
>> > > > > > also
>> > > > > > > use upsert sink to emit the result table after flatAggregate.
>> > > > > > >
>> > > > > > > So, Aljoscha, Hequn and I prepared a design document for how
>> to
>> > > > define
>> > > > > > the
>> > > > > > > update keys for  `flatAggregate` in upsert mode.  The detail
>> can
>> > be
>> > > > > found
>> > > > > > > here:
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
>> > > > > > >
>> > > > > > > I appreciate it if you can have look at the document and any
>> > > comments
>> > > > > are
>> > > > > > > welcome!
>> > > > > > >
>> > > > > > >
>> > > > > > > Best,
>> > > > > > >
>> > > > > > > Jincheng
>> > > > > > >
>> > > > > > >
>> > > > > > > [1]
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
>> > > > > > >
>> > > > > > > [2]
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by jincheng sun <su...@gmail.com>.
Hi All,

@Kurt Young <yk...@gmail.com> one user-defined table aggregate function
can be used in both with(out) keys case, and we do not introduce any other
aggregations. just like the explanation from @Hequn.

@Hequn Cheng <ch...@gmail.com> thanks for your explanation!

One thing should be mentioned here:

When a user uses a User-defined table aggregate function (UDTAGG), he must
understand the behavior of the UDTAGG, including the return type and the
characteristics of the returned data. such as the key fields.  So although
using `withKeys` approach it is not rigorous enough(we do not need) but
intuitive enough, considering that if `flatAggregate` is followed by an
`upsertSink`, then the user must know the keys of the output of UDTAGG
clearly, otherwise the keys of `upsertSink` cannot be defined. So I still
prefer the `withKeys` solution by now.

Looking forward to any feedback from all of you!

Best,
Jincheng



Hequn Cheng <ch...@gmail.com> 于2019年7月1日周一 下午5:35写道:

> Hi Kurt,
>
> Thanks for your questions. Here are my thoughts.
>
> > if I want to write such kind function, should I make sure that this
> function is used with some keys?
> The key information may not be used. We can also use RetractSink to emit
> the table directly.
>
> >  If I need a use case to calculate topn without key, should I write
> another function or I can reuse previous one.
> For the TopN example, you can reuse the previous function if you don't care
> about the key information.
>
> So, the key information is only an indicator(or a description), not an
> operator, as Jincheng mentioned above.
> We do not need to change the function logic and it will not add any other
> aggregations.
>
> BTW, we have three approaches in the document. Approach 1 defines keys on
> API level as we think it's more common to define keys on Table.
> While approach 3 defines keys in the TableAggregateFunction which is more
> precise but it is not very clear for Table users. So, we should take all
> these into consideration, and make the decision in this discussion thread.
>
> You can take a look at the document and welcome any suggestions or other
> better solutions.
>
> Best, Hequn
>
>
> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com> wrote:
>
> > Hi Jincheng,
> >
> > Thanks for the clarification. Take 'TopN' for example, if I want to write
> > such kind function,
> > should I make sure that this function is used with some keys? If I need a
> > use case to calculate
> > topn without key, should I write another function or I can reuse previous
> > one.
> >
> > I'm not sure about the idea of this does not involve semantic changes. To
> > me, it sounds like
> > we are doing another nested aggregation inside the table
> > which TableAggregateFunction emits.
> >
> > Maybe I'm not familiar with this function enough, hope you can help me to
> > understand.
> >
> > Best,
> > Kurt
> >
> >
> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <su...@gmail.com>
> > wrote:
> >
> > > Hi Kurt,
> > >
> > > Thanks for your questions, I am glad to share my thoughts here:
> > >
> > > My question is, will that effect the logic ofTableAggregateFunction
> user
> > > > wrote? Should the user know that there will a key and make some
> changes
> > > to
> > > > this function?
> > >
> > >
> > > No, the keys information depends on the implementation of the
> > > TableAggregateFunction.
> > > For example, for a `topN` user defined TableAggregateFunction, we can
> > only
> > > use the `keys` if the `topN` contains `rankid` in the output. You can
> > > treat the
> > > `keys` like an indicator.
> > >
> > > If not, how will framework deal with the output of user's
> > > > TableAggregateFunction.  if user output multiple rows with the same
> > key,
> > > > should be latter one replace previous ones?
> > >
> > >
> > > If a TableAggregateFunction outputs multiple rows with the same key,
> the
> > > latter one should replace the previous one, either with upsert mode or
> > > retract mode. i.e., Whether the user defines the Key or not, the Flink
> > > framework should ensure the correctness of the semantics.
> > >
> > > At present, the problem we are discussing does not involve semantic
> > > changes. The definition of keys is to support non-window flatAggregate
> on
> > > upsert mode. (The upsert mode is already supported in the flink
> > framework.
> > > The current discussion only needs to inform the framework that the keys
> > > information, which is the `withKeys` API we discussing.)
> > >
> > > Welcome any other feedbacks :)
> > >
> > > Best,
> > > Jincheng
> > >
> > > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
> > >
> > > > Hi,
> > > >
> > > > I have a question about the key information of
> TableAggregateFunction.
> > > > IIUC, you need to define
> > > > something like primary key or unique key in the result table of
> > > > TableAggregateFunction, and also
> > > > need a way to let user configure this through the API. My question
> is,
> > > will
> > > > that effect the logic of
> > > > TableAggregateFunction user wrote? Should the user know that there
> > will a
> > > > key and make some changes
> > > > to this function?
> > > >
> > > > If so, what's the semantic the user should learned. If not, how will
> > > > framework deal with the output of user's
> > > > TableAggregateFunction. For example, if user output multiple rows
> with
> > > the
> > > > same key, should be latter one
> > > > replace previous ones?
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
> sunjincheng121@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution is our
> > > better
> > > > > choice!
> > > > >
> > > > >
> > > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
> > > > >
> > > > > > Hi Jincheng,
> > > > > >
> > > > > > Thanks for raising the discussion!
> > > > > > The key information is very important for query optimizations. It
> > > would
> > > > > be
> > > > > > nice if we can use upsert mode to achieve better performance.
> > > > > >
> > > > > > +1 for the `withKeys` proposal. :)
> > > > > >
> > > > > > Best, Hequn
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
> > > sunjincheng121@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > With the continuous efforts from the community, we already
> > > supported
> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think It's
> > better
> > > > to
> > > > > > add
> > > > > > > upsert mode for  `flatAggregate`.
> > > > > > >
> > > > > > > The result table of streaming non-window `flatAggregate` is a
> > table
> > > > > > > contains updates. We can, of course, use a
> > > RetractStreamTableSink[2]
> > > > to
> > > > > > > emit the table, but we can get better performance in upsert
> mode.
> > > > > > However,
> > > > > > > due to the lack of keys, we can’t use an UpsertStreamTableSink
> to
> > > > emit
> > > > > > the
> > > > > > > table. We don’t have this problem for a normal aggregate as it
> > > emits
> > > > a
> > > > > > > single row for each group, so the unique keys are exactly the
> > same
> > > > with
> > > > > > the
> > > > > > > group keys. While for a `flatAggregate`, its pretty difference
> > that
> > > > due
> > > > > > to
> > > > > > > emits multi rows(a “sub-table”) for a single group. To solve
> this
> > > > > > problem,
> > > > > > > we need to find a way to define keys on flatAggregate, so that
> we
> > > can
> > > > > > also
> > > > > > > use upsert sink to emit the result table after flatAggregate.
> > > > > > >
> > > > > > > So, Aljoscha, Hequn and I prepared a design document for how to
> > > > define
> > > > > > the
> > > > > > > update keys for  `flatAggregate` in upsert mode.  The detail
> can
> > be
> > > > > found
> > > > > > > here:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
> > > > > > >
> > > > > > > I appreciate it if you can have look at the document and any
> > > comments
> > > > > are
> > > > > > > welcome!
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Jincheng
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
> > > > > > >
> > > > > > > [2]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Kurt,

Thanks for your questions. Here are my thoughts.

> if I want to write such kind function, should I make sure that this
function is used with some keys?
The key information may not be used. We can also use RetractSink to emit
the table directly.

>  If I need a use case to calculate topn without key, should I write
another function or I can reuse previous one.
For the TopN example, you can reuse the previous function if you don't care
about the key information.

So, the key information is only an indicator(or a description), not an
operator, as Jincheng mentioned above.
We do not need to change the function logic and it will not add any other
aggregations.

BTW, we have three approaches in the document. Approach 1 defines keys on
API level as we think it's more common to define keys on Table.
While approach 3 defines keys in the TableAggregateFunction which is more
precise but it is not very clear for Table users. So, we should take all
these into consideration, and make the decision in this discussion thread.

You can take a look at the document and welcome any suggestions or other
better solutions.

Best, Hequn


On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <yk...@gmail.com> wrote:

> Hi Jincheng,
>
> Thanks for the clarification. Take 'TopN' for example, if I want to write
> such kind function,
> should I make sure that this function is used with some keys? If I need a
> use case to calculate
> topn without key, should I write another function or I can reuse previous
> one.
>
> I'm not sure about the idea of this does not involve semantic changes. To
> me, it sounds like
> we are doing another nested aggregation inside the table
> which TableAggregateFunction emits.
>
> Maybe I'm not familiar with this function enough, hope you can help me to
> understand.
>
> Best,
> Kurt
>
>
> On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <su...@gmail.com>
> wrote:
>
> > Hi Kurt,
> >
> > Thanks for your questions, I am glad to share my thoughts here:
> >
> > My question is, will that effect the logic ofTableAggregateFunction user
> > > wrote? Should the user know that there will a key and make some changes
> > to
> > > this function?
> >
> >
> > No, the keys information depends on the implementation of the
> > TableAggregateFunction.
> > For example, for a `topN` user defined TableAggregateFunction, we can
> only
> > use the `keys` if the `topN` contains `rankid` in the output. You can
> > treat the
> > `keys` like an indicator.
> >
> > If not, how will framework deal with the output of user's
> > > TableAggregateFunction.  if user output multiple rows with the same
> key,
> > > should be latter one replace previous ones?
> >
> >
> > If a TableAggregateFunction outputs multiple rows with the same key, the
> > latter one should replace the previous one, either with upsert mode or
> > retract mode. i.e., Whether the user defines the Key or not, the Flink
> > framework should ensure the correctness of the semantics.
> >
> > At present, the problem we are discussing does not involve semantic
> > changes. The definition of keys is to support non-window flatAggregate on
> > upsert mode. (The upsert mode is already supported in the flink
> framework.
> > The current discussion only needs to inform the framework that the keys
> > information, which is the `withKeys` API we discussing.)
> >
> > Welcome any other feedbacks :)
> >
> > Best,
> > Jincheng
> >
> > Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
> >
> > > Hi,
> > >
> > > I have a question about the key information of TableAggregateFunction.
> > > IIUC, you need to define
> > > something like primary key or unique key in the result table of
> > > TableAggregateFunction, and also
> > > need a way to let user configure this through the API. My question is,
> > will
> > > that effect the logic of
> > > TableAggregateFunction user wrote? Should the user know that there
> will a
> > > key and make some changes
> > > to this function?
> > >
> > > If so, what's the semantic the user should learned. If not, how will
> > > framework deal with the output of user's
> > > TableAggregateFunction. For example, if user output multiple rows with
> > the
> > > same key, should be latter one
> > > replace previous ones?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <su...@gmail.com>
> > > wrote:
> > >
> > > > Hi hequn, Thanks for the reply! I think `withKeys` solution is our
> > better
> > > > choice!
> > > >
> > > >
> > > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
> > > >
> > > > > Hi Jincheng,
> > > > >
> > > > > Thanks for raising the discussion!
> > > > > The key information is very important for query optimizations. It
> > would
> > > > be
> > > > > nice if we can use upsert mode to achieve better performance.
> > > > >
> > > > > +1 for the `withKeys` proposal. :)
> > > > >
> > > > > Best, Hequn
> > > > >
> > > > >
> > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
> > sunjincheng121@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > With the continuous efforts from the community, we already
> > supported
> > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think It's
> better
> > > to
> > > > > add
> > > > > > upsert mode for  `flatAggregate`.
> > > > > >
> > > > > > The result table of streaming non-window `flatAggregate` is a
> table
> > > > > > contains updates. We can, of course, use a
> > RetractStreamTableSink[2]
> > > to
> > > > > > emit the table, but we can get better performance in upsert mode.
> > > > > However,
> > > > > > due to the lack of keys, we can’t use an UpsertStreamTableSink to
> > > emit
> > > > > the
> > > > > > table. We don’t have this problem for a normal aggregate as it
> > emits
> > > a
> > > > > > single row for each group, so the unique keys are exactly the
> same
> > > with
> > > > > the
> > > > > > group keys. While for a `flatAggregate`, its pretty difference
> that
> > > due
> > > > > to
> > > > > > emits multi rows(a “sub-table”) for a single group. To solve this
> > > > > problem,
> > > > > > we need to find a way to define keys on flatAggregate, so that we
> > can
> > > > > also
> > > > > > use upsert sink to emit the result table after flatAggregate.
> > > > > >
> > > > > > So, Aljoscha, Hequn and I prepared a design document for how to
> > > define
> > > > > the
> > > > > > update keys for  `flatAggregate` in upsert mode.  The detail can
> be
> > > > found
> > > > > > here:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
> > > > > >
> > > > > > I appreciate it if you can have look at the document and any
> > comments
> > > > are
> > > > > > welcome!
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Jincheng
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
> > > > > >
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Kurt Young <yk...@gmail.com>.
Hi Jincheng,

Thanks for the clarification. Take 'TopN' for example, if I want to write
such kind function,
should I make sure that this function is used with some keys? If I need a
use case to calculate
topn without key, should I write another function or I can reuse previous
one.

I'm not sure about the idea of this does not involve semantic changes. To
me, it sounds like
we are doing another nested aggregation inside the table
which TableAggregateFunction emits.

Maybe I'm not familiar with this function enough, hope you can help me to
understand.

Best,
Kurt


On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <su...@gmail.com>
wrote:

> Hi Kurt,
>
> Thanks for your questions, I am glad to share my thoughts here:
>
> My question is, will that effect the logic ofTableAggregateFunction user
> > wrote? Should the user know that there will a key and make some changes
> to
> > this function?
>
>
> No, the keys information depends on the implementation of the
> TableAggregateFunction.
> For example, for a `topN` user defined TableAggregateFunction, we can only
> use the `keys` if the `topN` contains `rankid` in the output. You can
> treat the
> `keys` like an indicator.
>
> If not, how will framework deal with the output of user's
> > TableAggregateFunction.  if user output multiple rows with the same key,
> > should be latter one replace previous ones?
>
>
> If a TableAggregateFunction outputs multiple rows with the same key, the
> latter one should replace the previous one, either with upsert mode or
> retract mode. i.e., Whether the user defines the Key or not, the Flink
> framework should ensure the correctness of the semantics.
>
> At present, the problem we are discussing does not involve semantic
> changes. The definition of keys is to support non-window flatAggregate on
> upsert mode. (The upsert mode is already supported in the flink framework.
> The current discussion only needs to inform the framework that the keys
> information, which is the `withKeys` API we discussing.)
>
> Welcome any other feedbacks :)
>
> Best,
> Jincheng
>
> Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:
>
> > Hi,
> >
> > I have a question about the key information of TableAggregateFunction.
> > IIUC, you need to define
> > something like primary key or unique key in the result table of
> > TableAggregateFunction, and also
> > need a way to let user configure this through the API. My question is,
> will
> > that effect the logic of
> > TableAggregateFunction user wrote? Should the user know that there will a
> > key and make some changes
> > to this function?
> >
> > If so, what's the semantic the user should learned. If not, how will
> > framework deal with the output of user's
> > TableAggregateFunction. For example, if user output multiple rows with
> the
> > same key, should be latter one
> > replace previous ones?
> >
> > Best,
> > Kurt
> >
> >
> > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <su...@gmail.com>
> > wrote:
> >
> > > Hi hequn, Thanks for the reply! I think `withKeys` solution is our
> better
> > > choice!
> > >
> > >
> > > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
> > >
> > > > Hi Jincheng,
> > > >
> > > > Thanks for raising the discussion!
> > > > The key information is very important for query optimizations. It
> would
> > > be
> > > > nice if we can use upsert mode to achieve better performance.
> > > >
> > > > +1 for the `withKeys` proposal. :)
> > > >
> > > > Best, Hequn
> > > >
> > > >
> > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
> sunjincheng121@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > With the continuous efforts from the community, we already
> supported
> > > > > `flatAggregate`[1] on TableAPI in retract mode. I think It's better
> > to
> > > > add
> > > > > upsert mode for  `flatAggregate`.
> > > > >
> > > > > The result table of streaming non-window `flatAggregate` is a table
> > > > > contains updates. We can, of course, use a
> RetractStreamTableSink[2]
> > to
> > > > > emit the table, but we can get better performance in upsert mode.
> > > > However,
> > > > > due to the lack of keys, we can’t use an UpsertStreamTableSink to
> > emit
> > > > the
> > > > > table. We don’t have this problem for a normal aggregate as it
> emits
> > a
> > > > > single row for each group, so the unique keys are exactly the same
> > with
> > > > the
> > > > > group keys. While for a `flatAggregate`, its pretty difference that
> > due
> > > > to
> > > > > emits multi rows(a “sub-table”) for a single group. To solve this
> > > > problem,
> > > > > we need to find a way to define keys on flatAggregate, so that we
> can
> > > > also
> > > > > use upsert sink to emit the result table after flatAggregate.
> > > > >
> > > > > So, Aljoscha, Hequn and I prepared a design document for how to
> > define
> > > > the
> > > > > update keys for  `flatAggregate` in upsert mode.  The detail can be
> > > found
> > > > > here:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
> > > > >
> > > > > I appreciate it if you can have look at the document and any
> comments
> > > are
> > > > > welcome!
> > > > >
> > > > >
> > > > > Best,
> > > > >
> > > > > Jincheng
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
> > > > >
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by jincheng sun <su...@gmail.com>.
Hi Kurt,

Thanks for your questions, I am glad to share my thoughts here:

My question is, will that effect the logic ofTableAggregateFunction user
> wrote? Should the user know that there will a key and make some changes to
> this function?


No, the keys information depends on the implementation of the
TableAggregateFunction.
For example, for a `topN` user defined TableAggregateFunction, we can only
use the `keys` if the `topN` contains `rankid` in the output. You can treat the
`keys` like an indicator.

If not, how will framework deal with the output of user's
> TableAggregateFunction.  if user output multiple rows with the same key,
> should be latter one replace previous ones?


If a TableAggregateFunction outputs multiple rows with the same key, the
latter one should replace the previous one, either with upsert mode or
retract mode. i.e., Whether the user defines the Key or not, the Flink
framework should ensure the correctness of the semantics.

At present, the problem we are discussing does not involve semantic
changes. The definition of keys is to support non-window flatAggregate on
upsert mode. (The upsert mode is already supported in the flink framework.
The current discussion only needs to inform the framework that the keys
information, which is the `withKeys` API we discussing.)

Welcome any other feedbacks :)

Best,
Jincheng

Kurt Young <yk...@gmail.com> 于2019年7月1日周一 上午9:23写道:

> Hi,
>
> I have a question about the key information of TableAggregateFunction.
> IIUC, you need to define
> something like primary key or unique key in the result table of
> TableAggregateFunction, and also
> need a way to let user configure this through the API. My question is, will
> that effect the logic of
> TableAggregateFunction user wrote? Should the user know that there will a
> key and make some changes
> to this function?
>
> If so, what's the semantic the user should learned. If not, how will
> framework deal with the output of user's
> TableAggregateFunction. For example, if user output multiple rows with the
> same key, should be latter one
> replace previous ones?
>
> Best,
> Kurt
>
>
> On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <su...@gmail.com>
> wrote:
>
> > Hi hequn, Thanks for the reply! I think `withKeys` solution is our better
> > choice!
> >
> >
> > Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
> >
> > > Hi Jincheng,
> > >
> > > Thanks for raising the discussion!
> > > The key information is very important for query optimizations. It would
> > be
> > > nice if we can use upsert mode to achieve better performance.
> > >
> > > +1 for the `withKeys` proposal. :)
> > >
> > > Best, Hequn
> > >
> > >
> > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <sunjincheng121@gmail.com
> >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > With the continuous efforts from the community, we already supported
> > > > `flatAggregate`[1] on TableAPI in retract mode. I think It's better
> to
> > > add
> > > > upsert mode for  `flatAggregate`.
> > > >
> > > > The result table of streaming non-window `flatAggregate` is a table
> > > > contains updates. We can, of course, use a RetractStreamTableSink[2]
> to
> > > > emit the table, but we can get better performance in upsert mode.
> > > However,
> > > > due to the lack of keys, we can’t use an UpsertStreamTableSink to
> emit
> > > the
> > > > table. We don’t have this problem for a normal aggregate as it emits
> a
> > > > single row for each group, so the unique keys are exactly the same
> with
> > > the
> > > > group keys. While for a `flatAggregate`, its pretty difference that
> due
> > > to
> > > > emits multi rows(a “sub-table”) for a single group. To solve this
> > > problem,
> > > > we need to find a way to define keys on flatAggregate, so that we can
> > > also
> > > > use upsert sink to emit the result table after flatAggregate.
> > > >
> > > > So, Aljoscha, Hequn and I prepared a design document for how to
> define
> > > the
> > > > update keys for  `flatAggregate` in upsert mode.  The detail can be
> > found
> > > > here:
> > > >
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
> > > >
> > > > I appreciate it if you can have look at the document and any comments
> > are
> > > > welcome!
> > > >
> > > >
> > > > Best,
> > > >
> > > > Jincheng
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
> > > >
> > > > [2]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
> > > >
> > >
> >
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Kurt Young <yk...@gmail.com>.
Hi,

I have a question about the key information of TableAggregateFunction.
IIUC, you need to define
something like primary key or unique key in the result table of
TableAggregateFunction, and also
need a way to let user configure this through the API. My question is, will
that effect the logic of
TableAggregateFunction user wrote? Should the user know that there will a
key and make some changes
to this function?

If so, what's the semantic the user should learned. If not, how will
framework deal with the output of user's
TableAggregateFunction. For example, if user output multiple rows with the
same key, should be latter one
replace previous ones?

Best,
Kurt


On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <su...@gmail.com>
wrote:

> Hi hequn, Thanks for the reply! I think `withKeys` solution is our better
> choice!
>
>
> Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:
>
> > Hi Jincheng,
> >
> > Thanks for raising the discussion!
> > The key information is very important for query optimizations. It would
> be
> > nice if we can use upsert mode to achieve better performance.
> >
> > +1 for the `withKeys` proposal. :)
> >
> > Best, Hequn
> >
> >
> > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <su...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > With the continuous efforts from the community, we already supported
> > > `flatAggregate`[1] on TableAPI in retract mode. I think It's better to
> > add
> > > upsert mode for  `flatAggregate`.
> > >
> > > The result table of streaming non-window `flatAggregate` is a table
> > > contains updates. We can, of course, use a RetractStreamTableSink[2] to
> > > emit the table, but we can get better performance in upsert mode.
> > However,
> > > due to the lack of keys, we can’t use an UpsertStreamTableSink to emit
> > the
> > > table. We don’t have this problem for a normal aggregate as it emits a
> > > single row for each group, so the unique keys are exactly the same with
> > the
> > > group keys. While for a `flatAggregate`, its pretty difference that due
> > to
> > > emits multi rows(a “sub-table”) for a single group. To solve this
> > problem,
> > > we need to find a way to define keys on flatAggregate, so that we can
> > also
> > > use upsert sink to emit the result table after flatAggregate.
> > >
> > > So, Aljoscha, Hequn and I prepared a design document for how to define
> > the
> > > update keys for  `flatAggregate` in upsert mode.  The detail can be
> found
> > > here:
> > >
> > >
> > >
> >
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
> > >
> > > I appreciate it if you can have look at the document and any comments
> are
> > > welcome!
> > >
> > >
> > > Best,
> > >
> > > Jincheng
> > >
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
> > >
> > > [2]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
> > >
> >
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by jincheng sun <su...@gmail.com>.
Hi hequn, Thanks for the reply! I think `withKeys` solution is our better
choice!


Hequn Cheng <ch...@gmail.com> 于2019年6月26日周三 下午5:11写道:

> Hi Jincheng,
>
> Thanks for raising the discussion!
> The key information is very important for query optimizations. It would be
> nice if we can use upsert mode to achieve better performance.
>
> +1 for the `withKeys` proposal. :)
>
> Best, Hequn
>
>
> On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <su...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > With the continuous efforts from the community, we already supported
> > `flatAggregate`[1] on TableAPI in retract mode. I think It's better to
> add
> > upsert mode for  `flatAggregate`.
> >
> > The result table of streaming non-window `flatAggregate` is a table
> > contains updates. We can, of course, use a RetractStreamTableSink[2] to
> > emit the table, but we can get better performance in upsert mode.
> However,
> > due to the lack of keys, we can’t use an UpsertStreamTableSink to emit
> the
> > table. We don’t have this problem for a normal aggregate as it emits a
> > single row for each group, so the unique keys are exactly the same with
> the
> > group keys. While for a `flatAggregate`, its pretty difference that due
> to
> > emits multi rows(a “sub-table”) for a single group. To solve this
> problem,
> > we need to find a way to define keys on flatAggregate, so that we can
> also
> > use upsert sink to emit the result table after flatAggregate.
> >
> > So, Aljoscha, Hequn and I prepared a design document for how to define
> the
> > update keys for  `flatAggregate` in upsert mode.  The detail can be found
> > here:
> >
> >
> >
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
> >
> > I appreciate it if you can have look at the document and any comments are
> > welcome!
> >
> >
> > Best,
> >
> > Jincheng
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
> >
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
> >
>

Re: [DISCUSS]Support Upsert mode for Streaming Non-window FlatAggregate

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Jincheng,

Thanks for raising the discussion!
The key information is very important for query optimizations. It would be
nice if we can use upsert mode to achieve better performance.

+1 for the `withKeys` proposal. :)

Best, Hequn


On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <su...@gmail.com>
wrote:

> Hi all,
>
> With the continuous efforts from the community, we already supported
> `flatAggregate`[1] on TableAPI in retract mode. I think It's better to add
> upsert mode for  `flatAggregate`.
>
> The result table of streaming non-window `flatAggregate` is a table
> contains updates. We can, of course, use a RetractStreamTableSink[2] to
> emit the table, but we can get better performance in upsert mode.  However,
> due to the lack of keys, we can’t use an UpsertStreamTableSink to emit the
> table. We don’t have this problem for a normal aggregate as it emits a
> single row for each group, so the unique keys are exactly the same with the
> group keys. While for a `flatAggregate`, its pretty difference that due to
> emits multi rows(a “sub-table”) for a single group. To solve this problem,
> we need to find a way to define keys on flatAggregate, so that we can also
> use upsert sink to emit the result table after flatAggregate.
>
> So, Aljoscha, Hequn and I prepared a design document for how to define the
> update keys for  `flatAggregate` in upsert mode.  The detail can be found
> here:
>
>
> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
>
> I appreciate it if you can have look at the document and any comments are
> welcome!
>
>
> Best,
>
> Jincheng
>
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
>