You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Piotr Nowojski <pi...@ververica.com> on 2019/12/04 09:24:46 UTC

Re: [DISCUSS] Add N-Ary Stream Operator

Hi,

First and foremost I would like to nominate myself to the Golden Shovel award for digging out this topic:



Secondly, I would like to discuss coming back to this particular idea of implementing N-Ary Stream Operator. This time motivation doesn’t come from the Side Inputs, but to efficiently support multi joins in SQL, without extra network exchanges. I’ve reviewed the design doc proposed by Aljoscha, I quite like it and I think we could start from that.

Specifically the end-goal is to allow for example Blink, to:

I. Implement A* multi broadcast join - to have a single operator chain, where probe table (source) is read locally (inside the task that’s is actually doing the join), then joined with multiple other broadcasted tables. 
II. Another example might be when we have 2 or more sources, pre-partitioned on the same key. In that case we should also be able to perform all of the table reading and the join inside a single Task.

In order to achieve that, I would propose the following plan:

1. Implement N-Ary Stream Operator as proposed in the design doc below, however with added support for the input selection [1].
  - initially it can be just exposed via the `StreamTransformation`, without direct access from the `DataStream API`

2. Allow it to be chained with sources (implemented using the FLIP-27 SourceReader [2])

3. Think about whether we need to support more complex chaining. Without this point, motivating examples (I and II) could be implemented if all of the joins/filtering/mappings are compiled/composed into a single N-Ary Stream Operator (which could be chained with some other single input operators at the tail). We could also think about supporting of chaining a tree of for example TwoInputStreamOperators inside a single Task. However I’m leaving this as a follow up, since in that case, it’s not so easy to handle the `InputSelection` of multiple operators inside the tree.

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html>
[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <al...@apache.org> wrote:
>> 
>> Hi,
>> yes, I see operators of this style as very much an internal thing. You are probably talking about use cases for OneInputOperator and TwoInputOperator where users have a very specific need and require access the the low-level details such as watermarks, state and timers to get stuff done. Maybe we could have a wrapper for these so that users can still use them but internally we wrap them in an N-Ary Operator.
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gy...@apache.org> wrote:
>> Hey,
>> 
>> Some initial feedback from side:
>> 
>> I think this a very important problem to deal with as a lot of applications
>> depend on it. I like the proposed runtime model and that is probably the
>> good way to handle this task, it is very clean what is happening.
>> 
>> My main concern is how to handle this from the API and UDFs. What you
>> proposed seems like a very internal thing from the API perspective and I
>> would be against exposing it in the way you wrote in your example. We
>> should make all effort to streamline this with the functional style
>> operators in some way. (so in that sense the way broadcastsets are handled
>> is pretty nice) Maybe we could extend ds.connect() to many streams
>> 
>> But in any case this is awesome initiative :)
>> 
>> Cheers,
>> Gyula
>> 
>> 
>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2016. ápr. 21.,
>> Cs, 15:56):
>> 
>>> Hi Team,
>>> I'm currently thinking about how we can bring the broadcast set/broadcast
>>> input feature form the DataSet API to the DataStream API. I think this
>>> would be a valuable addition since it would enable use cases that join
>>> streams with constant (or slowly changing) side information.
>>> 
>>> For this purpose, I think that we need to change the way we handle stream
>>> operators. The new model would have one unified operator that handles all
>>> cases and allows to add inputs after the operator was constructed, thus
>>> allowing the specification of broadcast inputs.
>>> 
>>> I wrote up this preliminary document detailing the reason why we need such
>>> a new operator for broadcast inputs and also what the API of such an
>>> operator would be. It also quickly touches on the required changes of
>>> existing per-operation stream operations such as StreamMap:
>>> 
>>> 
>>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>> 
>>> Please have a look if you're interested. Feedback/insights are very
>>> welcome. :-)
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
> 


Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Good question. I think not, at least not in the first version, unless someone can convince us that this is better to do immediately. 

Piotrek

> On 14 Jan 2020, at 04:49, Yun Tang <my...@live.com> wrote:
> 
> Hi 
> 
> I noticed that previous design doc [1] also talked about the topic of introducing new KeyedStreamOperatorNG, I wonder is that a must-do to introduce N-ary stream operator?
> 
> 
> [1] https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI>
> 
> Best
> Yun Tang
> From: Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>>
> Sent: Thursday, January 9, 2020 23:27
> To: dev <dev@flink.apache.org <ma...@flink.apache.org>>
> Subject: Re: [DISCUSS] Add N-Ary Stream Operator
>  
> Hi,
> 
> I have started a vote on this topic [1], please cast your +1 or -1 there :)
> 
> Also I assigned FLIP-92 number to this design doc.
> 
> Piotrek
> 
> [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html> <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html>>
> 
> > On 10 Dec 2019, at 07:10, Jingsong Li <jingsonglee0@gmail.com <ma...@gmail.com>> wrote:
> > 
> > Hi Piotr,
> > 
> > Sorry for the misunderstanding, chaining does work with multiple output
> > right now, I mean, it's also a very important feature, and it should work
> > with N-ary selectable input operators.
> > We all think that providing N-ary selectable input operator is a very
> > important thing, it makes TwoInputOperator chaining possible in upper
> > layer, and it makes things simpler.
> > 
> > Looking forward to it very much.
> > 
> > Best,
> > Jingsong Lee
> > 
> > On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
> > 
> >> Hi,
> >> 
> >> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work
> >> with multiple output right now (doesn’t it?), that’s also a good future
> >> story.
> >> 
> >> Re Kurt:
> >> I think this pattern could be easily handled if those two joins are
> >> implemented as a single 3 input operator, that internally is composed of
> >> those three operators.
> >> 1. You can set the initial InputSelection to Build1 and Build2.
> >> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1
> >> and Build2.
> >> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput`
> >> to the internal `HashAgg` operator
> >> 4. When Build2 finally receives `endOfInput`, you can finally forward the
> >> `endOfInput` to the internal `HashAgg`
> >> 
> >> Exactly for reasons like that, I wanted to at least post pone handling
> >> tree-like operator chains in the Flink. Logic like that is difficult to
> >> express generically, since it requires the knowledge about the operators
> >> behaviour. While when hardcoded for the specific project (Blink in this
> >> case) and encapsulated behind N-ary selectable input operator, it’s very
> >> easy to handle by the runtime. Sure, at the expense of a bit more
> >> complexity in forcing the user to compose operators, that’s why I’m not
> >> saying that we do not want to handle this at some point in the future, but
> >> at least not in the first version.
> >> 
> >> Piotrek
> >> 
> >>> On 5 Dec 2019, at 10:11, Jingsong Li <jingsonglee0@gmail.com <ma...@gmail.com>> wrote:
> >>> 
> >>> Kurt mentioned a very interesting thing,
> >>> 
> >>> If we want to better performance to read simultaneously, To this pattern:
> >>> We need to control not only the read order of inputs, but also the
> >> outputs
> >>> of endInput.
> >>> In this case, HashAggregate can only call its real endInput after the
> >> input
> >>> of build2 is finished, so the endInput of an operator is not necessarily
> >>> determined by its input, but also by other associated inputs.
> >>> I think we have the ability to do this in the n-input operator.
> >>> 
> >>> Note that these behaviors should be determined at compile time.
> >>> 
> >>> Best,
> >>> Jingsong Lee
> >>> 
> >>> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
> >>> 
> >>>> During implementing n-ary input operator in table, please keep
> >>>> this pattern in mind:
> >>>> 
> >>>> Build1 ---+
> >>>> 
> >>>>         |
> >>>> 
> >>>>         +---> HshJoin1 --—> HashAgg ---+
> >>>> 
> >>>>         |                              |
> >>>> 
> >>>> Probe1 ---+                              +---> HashJoin2
> >>>> 
> >>>>                                        |
> >>>> 
> >>>>                              Build2 ---+
> >>>> 
> >>>> It's quite interesting that both `Build1`, `Build2` and `Probe1` can
> >>>> be read simultaneously. But we need to control `HashAgg`'s output
> >>>> before `Build2` finished. I don't have a clear solution for now, but
> >>>> it's a common pattern we will face.
> >>>> 
> >>>> Best,
> >>>> Kurt
> >>>> 
> >>>> 
> >>>> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <jingsonglee0@gmail.com <ma...@gmail.com>>
> >> wrote:
> >>>> 
> >>>>> Hi Piotr,
> >>>>> 
> >>>>>> a) two input operator X -> one input operator Y -> one input operator
> >> Z
> >>>>> (ALLOWED)
> >>>>>> b) n input operator X -> one input operator Y -> one input operator Z
> >>>>> (ALLOWED)
> >>>>>> c) two input operator X -> one input operator Y -> two input operator
> >> Z
> >>>>> (NOT ALLOWED as a single chain)
> >>>>> 
> >>>>> NOT ALLOWED to c) sounds good to me. I understand that it is very
> >>>> difficult
> >>>>> to propose a general support for any input selectable two input
> >> operators
> >>>>> chain with high performance.
> >>>>> And it is not necessary for table layer too. b) has already excited us.
> >>>>> 
> >>>>> Actually, we have supported n output chain too:
> >>>>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
> >>>>>                                                -> one op A2 -> one op
> >>>> B2
> >>>>> -> one op C2
> >>>>> d) is a very useful feature too.
> >>>>> 
> >>>>>> Do you mean that those Table API/SQL use cases
> >> (HashJoin/SortMergeJoin)
> >>>>> could be easily handled by a single N-Ary Stream Operator, so this
> >> would
> >>>> be
> >>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
> >>>> would
> >>>>> be real nice (avoiding the input selection chaining).
> >>>>> 
> >>>>> Yes, because in the table layer, the typical scenarios currently only
> >>>> have
> >>>>> static order. (We don't consider MergeJoin here, because it's too
> >> complex
> >>>>> to be optimized, and not deserved to be optimized at present.).
> >>>>> For example, the current TwoInputOperators: HashJoin and
> >> NestedLoopJoin.
> >>>>> They are all static reading order. We must read the build input before
> >> we
> >>>>> can read the probe input.
> >>>>> So after we analyze chain, we put all the operators that can chain into
> >>>> a N
> >>>>> input operator, We can analyze the static order required by this
> >>>> operator,
> >>>>> and divide the reading order into several levels:
> >>>>> - fist level: input4, input5, input1
> >>>>> - second level: input2, input6
> >>>>> - third level: input1, input7
> >>>>> Note that these analyses are at the compile time of the client.
> >>>>> At runtime, we just need to read in a fixed order.
> >>>>> 
> >>>>> Best,
> >>>>> Jingsong Lee
> >>>>> 
> >>>>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>>
> >>>>> wrote:
> >>>>> 
> >>>>>> Hi Jingsong,
> >>>>>> 
> >>>>>> Thanks for the feedback :)
> >>>>>> 
> >>>>>> Could you clarify a little bit what do you mean by your wished use
> >>>> cases?
> >>>>>> 
> >>>>>>> There are a large number jobs (in production environment) that their
> >>>>>>> TwoInputOperators that can be chained. We used to only watch the last
> >>>>>>> ten tasks transmit data through disk and network, which could have
> >>>> been
> >>>>>>> done in one task.
> >>>>>>> For performance, if we can chain them, the average is 30%+, and there
> >>>>>>> is an order of magnitude in extreme cases.
> >>>>>> 
> >>>>>> As I mentioned at the end, I would like to avoid/post pone chaining of
> >>>>>> multiple/two input operators one after another because of the
> >>>> complexity
> >>>>> of
> >>>>>> input selection. For the first version I would like to aim only to
> >>>> allow
> >>>>>> chaining the single input operators with something (2 or N input must
> >>>> be
> >>>>>> always head of the chain) . For example chains:
> >>>>>> 
> >>>>>> a) two input operator X -> one input operator Y -> one input operator
> >> Z
> >>>>>> (ALLOWED)
> >>>>>> b) n input operator X -> one input operator Y -> one input operator Z
> >>>>>> (ALLOWED)
> >>>>>> c) two input operator X -> one input operator Y -> two input operator
> >> Z
> >>>>>> (NOT ALLOWED as a single chain)
> >>>>>> 
> >>>>>> The example above sounds to me like c)
> >>>>>> 
> >>>>>> I think as a follow up, we could allow c), by extend chaining to a
> >>>> simple
> >>>>>> rule: there can only be a single input selectable operator in the
> >> chain
> >>>>>> (again, it’s the chaining of multiple input selectable operators
> >> that’s
> >>>>>> causing some problems).
> >>>>>> 
> >>>>>>> The table layer has many special features. which give us the chance
> >>>> to
> >>>>>> optimize
> >>>>>>> it, but also results that it is hard to let underlying layer to
> >>>>> provide
> >>>>>> an abstract
> >>>>>>> mechanism to implement it. For example:
> >>>>>>> - HashJoin must read all the data on one side(build side) and then
> >>>> read
> >>>>>> the
> >>>>>>> other side (probe side).
> >>>>>>> - HashJoin only emit data when read probe side.
> >>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
> >>>> another
> >>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
> >>>>>>> - HashAggregate/Sort only emit data in endInput.
> >>>>>>> 
> >>>>>>> Provide an N-Ary stream operator to make everything possible. The
> >>>> upper
> >>>>>>> layer can do anything. These things can be specific optimization,
> >>>>> which
> >>>>>> is much
> >>>>>>> more natural than the lower layer.
> >>>>>> 
> >>>>>> Do you mean that those Table API/SQL use cases
> >> (HashJoin/SortMergeJoin)
> >>>>>> could be easily handled by a single N-Ary Stream Operator, so this
> >>>> would
> >>>>> be
> >>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
> >>>>> would
> >>>>>> be real nice (avoiding the input selection chaining).
> >>>>>> 
> >>>>>> Piotrek
> >>>>>> 
> >>>>>>> On 4 Dec 2019, at 14:29, Jingsong Li <jingsonglee0@gmail.com <ma...@gmail.com>> wrote:
> >>>>>>> 
> >>>>>>> Hi Piotr,
> >>>>>>> 
> >>>>>>> Huge +1 for N-Ary Stream Operator.
> >>>>>>> And I love this Golden Shovel award very much!
> >>>>>>> 
> >>>>>>> There are a large number jobs (in production environment) that their
> >>>>>>> TwoInputOperators that can be chained. We used to only watch the last
> >>>>>>> ten tasks transmit data through disk and network, which could have
> >>>> been
> >>>>>>> done in one task.
> >>>>>>> For performance, if we can chain them, the average is 30%+, and there
> >>>>>>> is an order of magnitude in extreme cases.
> >>>>>>> 
> >>>>>>> The table layer has many special features. which give us the chance
> >>>> to
> >>>>>> optimize
> >>>>>>> it, but also results that it is hard to let underlying layer to
> >>>>> provide
> >>>>>> an abstract
> >>>>>>> mechanism to implement it. For example:
> >>>>>>> - HashJoin must read all the data on one side(build side) and then
> >>>> read
> >>>>>> the
> >>>>>>> other side (probe side).
> >>>>>>> - HashJoin only emit data when read probe side.
> >>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
> >>>> another
> >>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
> >>>>>>> - HashAggregate/Sort only emit data in endInput.
> >>>>>>> 
> >>>>>>> Provide an N-Ary stream operator to make everything possible. The
> >>>> upper
> >>>>>>> layer can do anything. These things can be specific optimization,
> >>>>> which
> >>>>>> is much
> >>>>>>> more natural than the lower layer.
> >>>>>>> 
> >>>>>>> In addition to the two optimizations you mentioned, it also gives
> >>>> more
> >>>>>> space to
> >>>>>>> eliminate virtual function calls:
> >>>>>>> Because following this way, the table layer has to consider the
> >>>>> operator
> >>>>>> chain.
> >>>>>>> And in the future, we can optimize a whole N-Ary stream operator to a
> >>>>>>> JIT-friendly operator. Without virtual function calls, JIT can play
> >>>>> its
> >>>>>> real strength.
> >>>>>>> 
> >>>>>>> Best,
> >>>>>>> Jingsong Lee
> >>>>>>> 
> >>>>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>
> >>>>>> <mailto:piotr@ververica.com <ma...@ververica.com>>> wrote:
> >>>>>>> Hi,
> >>>>>>> 
> >>>>>>> First and foremost I would like to nominate myself to the Golden
> >>>> Shovel
> >>>>>> award for digging out this topic:
> >>>>>>> 
> >>>>>>> 
> >>>>>>> 
> >>>>>>> Secondly, I would like to discuss coming back to this particular idea
> >>>>> of
> >>>>>> implementing N-Ary Stream Operator. This time motivation doesn’t come
> >>>>> from
> >>>>>> the Side Inputs, but to efficiently support multi joins in SQL,
> >> without
> >>>>>> extra network exchanges. I’ve reviewed the design doc proposed by
> >>>>> Aljoscha,
> >>>>>> I quite like it and I think we could start from that.
> >>>>>>> 
> >>>>>>> Specifically the end-goal is to allow for example Blink, to:
> >>>>>>> 
> >>>>>>> I. Implement A* multi broadcast join - to have a single operator
> >>>> chain,
> >>>>>> where probe table (source) is read locally (inside the task that’s is
> >>>>>> actually doing the join), then joined with multiple other broadcasted
> >>>>>> tables.
> >>>>>>> II. Another example might be when we have 2 or more sources,
> >>>>>> pre-partitioned on the same key. In that case we should also be able
> >> to
> >>>>>> perform all of the table reading and the join inside a single Task.
> >>>>>>> 
> >>>>>>> In order to achieve that, I would propose the following plan:
> >>>>>>> 
> >>>>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc
> >>>> below,
> >>>>>> however with added support for the input selection [1].
> >>>>>>> - initially it can be just exposed via the `StreamTransformation`,
> >>>>>> without direct access from the `DataStream API`
> >>>>>>> 
> >>>>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27
> >>>>>> SourceReader [2])
> >>>>>>> 
> >>>>>>> 3. Think about whether we need to support more complex chaining.
> >>>>> Without
> >>>>>> this point, motivating examples (I and II) could be implemented if all
> >>>> of
> >>>>>> the joins/filtering/mappings are compiled/composed into a single N-Ary
> >>>>>> Stream Operator (which could be chained with some other single input
> >>>>>> operators at the tail). We could also think about supporting of
> >>>> chaining
> >>>>> a
> >>>>>> tree of for example TwoInputStreamOperators inside a single Task.
> >>>> However
> >>>>>> I’m leaving this as a follow up, since in that case, it’s not so easy
> >>>> to
> >>>>>> handle the `InputSelection` of multiple operators inside the tree.
> >>>>>>> 
> >>>>>>> Piotrek
> >>>>>>> 
> >>>>>>> [1]
> >>>>>> 
> >>>>> 
> >>>> 
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html>
> >>>>>> <
> >>>>>> 
> >>>>> 
> >>>> 
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html>
> >>>>>>> 
> >>>>>>> [2]
> >>>>>> 
> >>>>> 
> >>>> 
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>
> >>>>>> <
> >>>>>> 
> >>>>> 
> >>>> 
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
> >>>>>>> 
> >>>>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>
> >>>>>> <mailto:aljoscha@apache.org <ma...@apache.org>>> wrote:
> >>>>>>>>> 
> >>>>>>>>> Hi,
> >>>>>>>>> yes, I see operators of this style as very much an internal thing.
> >>>>> You
> >>>>>> are probably talking about use cases for OneInputOperator and
> >>>>>> TwoInputOperator where users have a very specific need and require
> >>>> access
> >>>>>> the the low-level details such as watermarks, state and timers to get
> >>>>> stuff
> >>>>>> done. Maybe we could have a wrapper for these so that users can still
> >>>> use
> >>>>>> them but internally we wrap them in an N-Ary Operator.
> >>>>>>>>> 
> >>>>>>>>> Cheers,
> >>>>>>>>> Aljoscha
> >>>>>>>>> 
> >>>>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org <ma...@apache.org>
> >>>> <mailto:
> >>>>>> gyfora@apache.org <ma...@apache.org>>> wrote:
> >>>>>>>>> Hey,
> >>>>>>>>> 
> >>>>>>>>> Some initial feedback from side:
> >>>>>>>>> 
> >>>>>>>>> I think this a very important problem to deal with as a lot of
> >>>>>> applications
> >>>>>>>>> depend on it. I like the proposed runtime model and that is
> >>>> probably
> >>>>>> the
> >>>>>>>>> good way to handle this task, it is very clean what is happening.
> >>>>>>>>> 
> >>>>>>>>> My main concern is how to handle this from the API and UDFs. What
> >>>> you
> >>>>>>>>> proposed seems like a very internal thing from the API perspective
> >>>>> and
> >>>>>> I
> >>>>>>>>> would be against exposing it in the way you wrote in your example.
> >>>> We
> >>>>>>>>> should make all effort to streamline this with the functional style
> >>>>>>>>> operators in some way. (so in that sense the way broadcastsets are
> >>>>>> handled
> >>>>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams
> >>>>>>>>> 
> >>>>>>>>> But in any case this is awesome initiative :)
> >>>>>>>>> 
> >>>>>>>>> Cheers,
> >>>>>>>>> Gyula
> >>>>>>>>> 
> >>>>>>>>> 
> >>>>>>>>> Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org> < <applewebdata://871EABA1-D0CC-44F4-8186-E60D01B3A4DE>mailto:aljoscha@apache.org <ma...@apache.org>
> >>>>>> 
> >>>>>> ezt írta (időpont: 2016. ápr. 21.,
> >>>>>>>>> Cs, 15:56):
> >>>>>>>>> 
> >>>>>>>>>> Hi Team,
> >>>>>>>>>> I'm currently thinking about how we can bring the broadcast
> >>>>>> set/broadcast
> >>>>>>>>>> input feature form the DataSet API to the DataStream API. I think
> >>>>> this
> >>>>>>>>>> would be a valuable addition since it would enable use cases that
> >>>>> join
> >>>>>>>>>> streams with constant (or slowly changing) side information.
> >>>>>>>>>> 
> >>>>>>>>>> For this purpose, I think that we need to change the way we handle
> >>>>>> stream
> >>>>>>>>>> operators. The new model would have one unified operator that
> >>>>> handles
> >>>>>> all
> >>>>>>>>>> cases and allows to add inputs after the operator was constructed,
> >>>>>> thus
> >>>>>>>>>> allowing the specification of broadcast inputs.
> >>>>>>>>>> 
> >>>>>>>>>> I wrote up this preliminary document detailing the reason why we
> >>>>> need
> >>>>>> such
> >>>>>>>>>> a new operator for broadcast inputs and also what the API of such
> >>>> an
> >>>>>>>>>> operator would be. It also quickly touches on the required changes
> >>>>> of
> >>>>>>>>>> existing per-operation stream operations such as StreamMap:
> >>>>>>>>>> 
> >>>>>>>>>> 
> >>>>>>>>>> 
> >>>>>> 
> >>>>> 
> >>>> 
> >> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing>
> >>>>>> <
> >>>>>> 
> >>>>> 
> >>>> 
> >> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing>
> >>>>>>> 
> >>>>>>>>>> 
> >>>>>>>>>> Please have a look if you're interested. Feedback/insights are
> >>>> very
> >>>>>>>>>> welcome. :-)
> >>>>>>>>>> 
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>> 
> >>>>>>>> 
> >>>>>>> 
> >>>>>>> 
> >>>>>>> 
> >>>>>>> --
> >>>>>>> Best, Jingsong Lee
> >>>>>> 
> >>>>>> 
> >>>>> 
> >>>>> --
> >>>>> Best, Jingsong Lee
> >>>>> 
> >>>> 
> >>> 
> >>> 
> >>> --
> >>> Best, Jingsong Lee
> >> 
> >> 
> > 
> > -- 
> > Best, Jingsong Lee
> 


Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Yun Tang <my...@live.com>.
Hi

I noticed that previous design doc [1] also talked about the topic of introducing new KeyedStreamOperatorNG, I wonder is that a must-do to introduce N-ary stream operator?


[1] https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI

Best
Yun Tang
________________________________
From: Piotr Nowojski <pi...@ververica.com>
Sent: Thursday, January 9, 2020 23:27
To: dev <de...@flink.apache.org>
Subject: Re: [DISCUSS] Add N-Ary Stream Operator

Hi,

I have started a vote on this topic [1], please cast your +1 or -1 there :)

Also I assigned FLIP-92 number to this design doc.

Piotrek

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html>

> On 10 Dec 2019, at 07:10, Jingsong Li <ji...@gmail.com> wrote:
>
> Hi Piotr,
>
> Sorry for the misunderstanding, chaining does work with multiple output
> right now, I mean, it's also a very important feature, and it should work
> with N-ary selectable input operators.
> We all think that providing N-ary selectable input operator is a very
> important thing, it makes TwoInputOperator chaining possible in upper
> layer, and it makes things simpler.
>
> Looking forward to it very much.
>
> Best,
> Jingsong Lee
>
> On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski <pi...@ververica.com> wrote:
>
>> Hi,
>>
>> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work
>> with multiple output right now (doesn’t it?), that’s also a good future
>> story.
>>
>> Re Kurt:
>> I think this pattern could be easily handled if those two joins are
>> implemented as a single 3 input operator, that internally is composed of
>> those three operators.
>> 1. You can set the initial InputSelection to Build1 and Build2.
>> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1
>> and Build2.
>> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput`
>> to the internal `HashAgg` operator
>> 4. When Build2 finally receives `endOfInput`, you can finally forward the
>> `endOfInput` to the internal `HashAgg`
>>
>> Exactly for reasons like that, I wanted to at least post pone handling
>> tree-like operator chains in the Flink. Logic like that is difficult to
>> express generically, since it requires the knowledge about the operators
>> behaviour. While when hardcoded for the specific project (Blink in this
>> case) and encapsulated behind N-ary selectable input operator, it’s very
>> easy to handle by the runtime. Sure, at the expense of a bit more
>> complexity in forcing the user to compose operators, that’s why I’m not
>> saying that we do not want to handle this at some point in the future, but
>> at least not in the first version.
>>
>> Piotrek
>>
>>> On 5 Dec 2019, at 10:11, Jingsong Li <ji...@gmail.com> wrote:
>>>
>>> Kurt mentioned a very interesting thing,
>>>
>>> If we want to better performance to read simultaneously, To this pattern:
>>> We need to control not only the read order of inputs, but also the
>> outputs
>>> of endInput.
>>> In this case, HashAggregate can only call its real endInput after the
>> input
>>> of build2 is finished, so the endInput of an operator is not necessarily
>>> determined by its input, but also by other associated inputs.
>>> I think we have the ability to do this in the n-input operator.
>>>
>>> Note that these behaviors should be determined at compile time.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> During implementing n-ary input operator in table, please keep
>>>> this pattern in mind:
>>>>
>>>> Build1 ---+
>>>>
>>>>         |
>>>>
>>>>         +---> HshJoin1 --—> HashAgg ---+
>>>>
>>>>         |                              |
>>>>
>>>> Probe1 ---+                              +---> HashJoin2
>>>>
>>>>                                        |
>>>>
>>>>                              Build2 ---+
>>>>
>>>> It's quite interesting that both `Build1`, `Build2` and `Probe1` can
>>>> be read simultaneously. But we need to control `HashAgg`'s output
>>>> before `Build2` finished. I don't have a clear solution for now, but
>>>> it's a common pattern we will face.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <ji...@gmail.com>
>> wrote:
>>>>
>>>>> Hi Piotr,
>>>>>
>>>>>> a) two input operator X -> one input operator Y -> one input operator
>> Z
>>>>> (ALLOWED)
>>>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>>>> (ALLOWED)
>>>>>> c) two input operator X -> one input operator Y -> two input operator
>> Z
>>>>> (NOT ALLOWED as a single chain)
>>>>>
>>>>> NOT ALLOWED to c) sounds good to me. I understand that it is very
>>>> difficult
>>>>> to propose a general support for any input selectable two input
>> operators
>>>>> chain with high performance.
>>>>> And it is not necessary for table layer too. b) has already excited us.
>>>>>
>>>>> Actually, we have supported n output chain too:
>>>>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
>>>>>                                                -> one op A2 -> one op
>>>> B2
>>>>> -> one op C2
>>>>> d) is a very useful feature too.
>>>>>
>>>>>> Do you mean that those Table API/SQL use cases
>> (HashJoin/SortMergeJoin)
>>>>> could be easily handled by a single N-Ary Stream Operator, so this
>> would
>>>> be
>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>>>> would
>>>>> be real nice (avoiding the input selection chaining).
>>>>>
>>>>> Yes, because in the table layer, the typical scenarios currently only
>>>> have
>>>>> static order. (We don't consider MergeJoin here, because it's too
>> complex
>>>>> to be optimized, and not deserved to be optimized at present.).
>>>>> For example, the current TwoInputOperators: HashJoin and
>> NestedLoopJoin.
>>>>> They are all static reading order. We must read the build input before
>> we
>>>>> can read the probe input.
>>>>> So after we analyze chain, we put all the operators that can chain into
>>>> a N
>>>>> input operator, We can analyze the static order required by this
>>>> operator,
>>>>> and divide the reading order into several levels:
>>>>> - fist level: input4, input5, input1
>>>>> - second level: input2, input6
>>>>> - third level: input1, input7
>>>>> Note that these analyses are at the compile time of the client.
>>>>> At runtime, we just need to read in a fixed order.
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> Thanks for the feedback :)
>>>>>>
>>>>>> Could you clarify a little bit what do you mean by your wished use
>>>> cases?
>>>>>>
>>>>>>> There are a large number jobs (in production environment) that their
>>>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>>>> ten tasks transmit data through disk and network, which could have
>>>> been
>>>>>>> done in one task.
>>>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>>>> is an order of magnitude in extreme cases.
>>>>>>
>>>>>> As I mentioned at the end, I would like to avoid/post pone chaining of
>>>>>> multiple/two input operators one after another because of the
>>>> complexity
>>>>> of
>>>>>> input selection. For the first version I would like to aim only to
>>>> allow
>>>>>> chaining the single input operators with something (2 or N input must
>>>> be
>>>>>> always head of the chain) . For example chains:
>>>>>>
>>>>>> a) two input operator X -> one input operator Y -> one input operator
>> Z
>>>>>> (ALLOWED)
>>>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>>>>> (ALLOWED)
>>>>>> c) two input operator X -> one input operator Y -> two input operator
>> Z
>>>>>> (NOT ALLOWED as a single chain)
>>>>>>
>>>>>> The example above sounds to me like c)
>>>>>>
>>>>>> I think as a follow up, we could allow c), by extend chaining to a
>>>> simple
>>>>>> rule: there can only be a single input selectable operator in the
>> chain
>>>>>> (again, it’s the chaining of multiple input selectable operators
>> that’s
>>>>>> causing some problems).
>>>>>>
>>>>>>> The table layer has many special features. which give us the chance
>>>> to
>>>>>> optimize
>>>>>>> it, but also results that it is hard to let underlying layer to
>>>>> provide
>>>>>> an abstract
>>>>>>> mechanism to implement it. For example:
>>>>>>> - HashJoin must read all the data on one side(build side) and then
>>>> read
>>>>>> the
>>>>>>> other side (probe side).
>>>>>>> - HashJoin only emit data when read probe side.
>>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>>>> another
>>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>>>>
>>>>>>> Provide an N-Ary stream operator to make everything possible. The
>>>> upper
>>>>>>> layer can do anything. These things can be specific optimization,
>>>>> which
>>>>>> is much
>>>>>>> more natural than the lower layer.
>>>>>>
>>>>>> Do you mean that those Table API/SQL use cases
>> (HashJoin/SortMergeJoin)
>>>>>> could be easily handled by a single N-Ary Stream Operator, so this
>>>> would
>>>>> be
>>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>>>>> would
>>>>>> be real nice (avoiding the input selection chaining).
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>>> On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Piotr,
>>>>>>>
>>>>>>> Huge +1 for N-Ary Stream Operator.
>>>>>>> And I love this Golden Shovel award very much!
>>>>>>>
>>>>>>> There are a large number jobs (in production environment) that their
>>>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>>>> ten tasks transmit data through disk and network, which could have
>>>> been
>>>>>>> done in one task.
>>>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>>>> is an order of magnitude in extreme cases.
>>>>>>>
>>>>>>> The table layer has many special features. which give us the chance
>>>> to
>>>>>> optimize
>>>>>>> it, but also results that it is hard to let underlying layer to
>>>>> provide
>>>>>> an abstract
>>>>>>> mechanism to implement it. For example:
>>>>>>> - HashJoin must read all the data on one side(build side) and then
>>>> read
>>>>>> the
>>>>>>> other side (probe side).
>>>>>>> - HashJoin only emit data when read probe side.
>>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>>>> another
>>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>>>>
>>>>>>> Provide an N-Ary stream operator to make everything possible. The
>>>> upper
>>>>>>> layer can do anything. These things can be specific optimization,
>>>>> which
>>>>>> is much
>>>>>>> more natural than the lower layer.
>>>>>>>
>>>>>>> In addition to the two optimizations you mentioned, it also gives
>>>> more
>>>>>> space to
>>>>>>> eliminate virtual function calls:
>>>>>>> Because following this way, the table layer has to consider the
>>>>> operator
>>>>>> chain.
>>>>>>> And in the future, we can optimize a whole N-Ary stream operator to a
>>>>>>> JIT-friendly operator. Without virtual function calls, JIT can play
>>>>> its
>>>>>> real strength.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>>
>>>>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com
>>>>>> <ma...@ververica.com>> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> First and foremost I would like to nominate myself to the Golden
>>>> Shovel
>>>>>> award for digging out this topic:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Secondly, I would like to discuss coming back to this particular idea
>>>>> of
>>>>>> implementing N-Ary Stream Operator. This time motivation doesn’t come
>>>>> from
>>>>>> the Side Inputs, but to efficiently support multi joins in SQL,
>> without
>>>>>> extra network exchanges. I’ve reviewed the design doc proposed by
>>>>> Aljoscha,
>>>>>> I quite like it and I think we could start from that.
>>>>>>>
>>>>>>> Specifically the end-goal is to allow for example Blink, to:
>>>>>>>
>>>>>>> I. Implement A* multi broadcast join - to have a single operator
>>>> chain,
>>>>>> where probe table (source) is read locally (inside the task that’s is
>>>>>> actually doing the join), then joined with multiple other broadcasted
>>>>>> tables.
>>>>>>> II. Another example might be when we have 2 or more sources,
>>>>>> pre-partitioned on the same key. In that case we should also be able
>> to
>>>>>> perform all of the table reading and the join inside a single Task.
>>>>>>>
>>>>>>> In order to achieve that, I would propose the following plan:
>>>>>>>
>>>>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc
>>>> below,
>>>>>> however with added support for the input selection [1].
>>>>>>> - initially it can be just exposed via the `StreamTransformation`,
>>>>>> without direct access from the `DataStream API`
>>>>>>>
>>>>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27
>>>>>> SourceReader [2])
>>>>>>>
>>>>>>> 3. Think about whether we need to support more complex chaining.
>>>>> Without
>>>>>> this point, motivating examples (I and II) could be implemented if all
>>>> of
>>>>>> the joins/filtering/mappings are compiled/composed into a single N-Ary
>>>>>> Stream Operator (which could be chained with some other single input
>>>>>> operators at the tail). We could also think about supporting of
>>>> chaining
>>>>> a
>>>>>> tree of for example TwoInputStreamOperators inside a single Task.
>>>> However
>>>>>> I’m leaving this as a follow up, since in that case, it’s not so easy
>>>> to
>>>>>> handle the `InputSelection` of multiple operators inside the tree.
>>>>>>>
>>>>>>> Piotrek
>>>>>>>
>>>>>>> [1]
>>>>>>
>>>>>
>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>>>> <
>>>>>>
>>>>>
>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>>>>>
>>>>>>> [2]
>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>> <
>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>>>>>>>
>>>>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org
>>>>>> <ma...@apache.org>> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> yes, I see operators of this style as very much an internal thing.
>>>>> You
>>>>>> are probably talking about use cases for OneInputOperator and
>>>>>> TwoInputOperator where users have a very specific need and require
>>>> access
>>>>>> the the low-level details such as watermarks, state and timers to get
>>>>> stuff
>>>>>> done. Maybe we could have a wrapper for these so that users can still
>>>> use
>>>>>> them but internally we wrap them in an N-Ary Operator.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org
>>>> <mailto:
>>>>>> gyfora@apache.org>> wrote:
>>>>>>>>> Hey,
>>>>>>>>>
>>>>>>>>> Some initial feedback from side:
>>>>>>>>>
>>>>>>>>> I think this a very important problem to deal with as a lot of
>>>>>> applications
>>>>>>>>> depend on it. I like the proposed runtime model and that is
>>>> probably
>>>>>> the
>>>>>>>>> good way to handle this task, it is very clean what is happening.
>>>>>>>>>
>>>>>>>>> My main concern is how to handle this from the API and UDFs. What
>>>> you
>>>>>>>>> proposed seems like a very internal thing from the API perspective
>>>>> and
>>>>>> I
>>>>>>>>> would be against exposing it in the way you wrote in your example.
>>>> We
>>>>>>>>> should make all effort to streamline this with the functional style
>>>>>>>>> operators in some way. (so in that sense the way broadcastsets are
>>>>>> handled
>>>>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams
>>>>>>>>>
>>>>>>>>> But in any case this is awesome initiative :)
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org
>>>>>>
>>>>>> ezt írta (időpont: 2016. ápr. 21.,
>>>>>>>>> Cs, 15:56):
>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>> I'm currently thinking about how we can bring the broadcast
>>>>>> set/broadcast
>>>>>>>>>> input feature form the DataSet API to the DataStream API. I think
>>>>> this
>>>>>>>>>> would be a valuable addition since it would enable use cases that
>>>>> join
>>>>>>>>>> streams with constant (or slowly changing) side information.
>>>>>>>>>>
>>>>>>>>>> For this purpose, I think that we need to change the way we handle
>>>>>> stream
>>>>>>>>>> operators. The new model would have one unified operator that
>>>>> handles
>>>>>> all
>>>>>>>>>> cases and allows to add inputs after the operator was constructed,
>>>>>> thus
>>>>>>>>>> allowing the specification of broadcast inputs.
>>>>>>>>>>
>>>>>>>>>> I wrote up this preliminary document detailing the reason why we
>>>>> need
>>>>>> such
>>>>>>>>>> a new operator for broadcast inputs and also what the API of such
>>>> an
>>>>>>>>>> operator would be. It also quickly touches on the required changes
>>>>> of
>>>>>>>>>> existing per-operation stream operations such as StreamMap:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>>> <
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Please have a look if you're interested. Feedback/insights are
>>>> very
>>>>>>>>>> welcome. :-)
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>
>>
>
> --
> Best, Jingsong Lee


Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

I have started a vote on this topic [1], please cast your +1 or -1 there :)

Also I assigned FLIP-92 number to this design doc.

Piotrek

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html>

> On 10 Dec 2019, at 07:10, Jingsong Li <ji...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Sorry for the misunderstanding, chaining does work with multiple output
> right now, I mean, it's also a very important feature, and it should work
> with N-ary selectable input operators.
> We all think that providing N-ary selectable input operator is a very
> important thing, it makes TwoInputOperator chaining possible in upper
> layer, and it makes things simpler.
> 
> Looking forward to it very much.
> 
> Best,
> Jingsong Lee
> 
> On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski <pi...@ververica.com> wrote:
> 
>> Hi,
>> 
>> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work
>> with multiple output right now (doesn’t it?), that’s also a good future
>> story.
>> 
>> Re Kurt:
>> I think this pattern could be easily handled if those two joins are
>> implemented as a single 3 input operator, that internally is composed of
>> those three operators.
>> 1. You can set the initial InputSelection to Build1 and Build2.
>> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1
>> and Build2.
>> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput`
>> to the internal `HashAgg` operator
>> 4. When Build2 finally receives `endOfInput`, you can finally forward the
>> `endOfInput` to the internal `HashAgg`
>> 
>> Exactly for reasons like that, I wanted to at least post pone handling
>> tree-like operator chains in the Flink. Logic like that is difficult to
>> express generically, since it requires the knowledge about the operators
>> behaviour. While when hardcoded for the specific project (Blink in this
>> case) and encapsulated behind N-ary selectable input operator, it’s very
>> easy to handle by the runtime. Sure, at the expense of a bit more
>> complexity in forcing the user to compose operators, that’s why I’m not
>> saying that we do not want to handle this at some point in the future, but
>> at least not in the first version.
>> 
>> Piotrek
>> 
>>> On 5 Dec 2019, at 10:11, Jingsong Li <ji...@gmail.com> wrote:
>>> 
>>> Kurt mentioned a very interesting thing,
>>> 
>>> If we want to better performance to read simultaneously, To this pattern:
>>> We need to control not only the read order of inputs, but also the
>> outputs
>>> of endInput.
>>> In this case, HashAggregate can only call its real endInput after the
>> input
>>> of build2 is finished, so the endInput of an operator is not necessarily
>>> determined by its input, but also by other associated inputs.
>>> I think we have the ability to do this in the n-input operator.
>>> 
>>> Note that these behaviors should be determined at compile time.
>>> 
>>> Best,
>>> Jingsong Lee
>>> 
>>> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <yk...@gmail.com> wrote:
>>> 
>>>> During implementing n-ary input operator in table, please keep
>>>> this pattern in mind:
>>>> 
>>>> Build1 ---+
>>>> 
>>>>         |
>>>> 
>>>>         +---> HshJoin1 --—> HashAgg ---+
>>>> 
>>>>         |                              |
>>>> 
>>>> Probe1 ---+                              +---> HashJoin2
>>>> 
>>>>                                        |
>>>> 
>>>>                              Build2 ---+
>>>> 
>>>> It's quite interesting that both `Build1`, `Build2` and `Probe1` can
>>>> be read simultaneously. But we need to control `HashAgg`'s output
>>>> before `Build2` finished. I don't have a clear solution for now, but
>>>> it's a common pattern we will face.
>>>> 
>>>> Best,
>>>> Kurt
>>>> 
>>>> 
>>>> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <ji...@gmail.com>
>> wrote:
>>>> 
>>>>> Hi Piotr,
>>>>> 
>>>>>> a) two input operator X -> one input operator Y -> one input operator
>> Z
>>>>> (ALLOWED)
>>>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>>>> (ALLOWED)
>>>>>> c) two input operator X -> one input operator Y -> two input operator
>> Z
>>>>> (NOT ALLOWED as a single chain)
>>>>> 
>>>>> NOT ALLOWED to c) sounds good to me. I understand that it is very
>>>> difficult
>>>>> to propose a general support for any input selectable two input
>> operators
>>>>> chain with high performance.
>>>>> And it is not necessary for table layer too. b) has already excited us.
>>>>> 
>>>>> Actually, we have supported n output chain too:
>>>>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
>>>>>                                                -> one op A2 -> one op
>>>> B2
>>>>> -> one op C2
>>>>> d) is a very useful feature too.
>>>>> 
>>>>>> Do you mean that those Table API/SQL use cases
>> (HashJoin/SortMergeJoin)
>>>>> could be easily handled by a single N-Ary Stream Operator, so this
>> would
>>>> be
>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>>>> would
>>>>> be real nice (avoiding the input selection chaining).
>>>>> 
>>>>> Yes, because in the table layer, the typical scenarios currently only
>>>> have
>>>>> static order. (We don't consider MergeJoin here, because it's too
>> complex
>>>>> to be optimized, and not deserved to be optimized at present.).
>>>>> For example, the current TwoInputOperators: HashJoin and
>> NestedLoopJoin.
>>>>> They are all static reading order. We must read the build input before
>> we
>>>>> can read the probe input.
>>>>> So after we analyze chain, we put all the operators that can chain into
>>>> a N
>>>>> input operator, We can analyze the static order required by this
>>>> operator,
>>>>> and divide the reading order into several levels:
>>>>> - fist level: input4, input5, input1
>>>>> - second level: input2, input6
>>>>> - third level: input1, input7
>>>>> Note that these analyses are at the compile time of the client.
>>>>> At runtime, we just need to read in a fixed order.
>>>>> 
>>>>> Best,
>>>>> Jingsong Lee
>>>>> 
>>>>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Jingsong,
>>>>>> 
>>>>>> Thanks for the feedback :)
>>>>>> 
>>>>>> Could you clarify a little bit what do you mean by your wished use
>>>> cases?
>>>>>> 
>>>>>>> There are a large number jobs (in production environment) that their
>>>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>>>> ten tasks transmit data through disk and network, which could have
>>>> been
>>>>>>> done in one task.
>>>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>>>> is an order of magnitude in extreme cases.
>>>>>> 
>>>>>> As I mentioned at the end, I would like to avoid/post pone chaining of
>>>>>> multiple/two input operators one after another because of the
>>>> complexity
>>>>> of
>>>>>> input selection. For the first version I would like to aim only to
>>>> allow
>>>>>> chaining the single input operators with something (2 or N input must
>>>> be
>>>>>> always head of the chain) . For example chains:
>>>>>> 
>>>>>> a) two input operator X -> one input operator Y -> one input operator
>> Z
>>>>>> (ALLOWED)
>>>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>>>>> (ALLOWED)
>>>>>> c) two input operator X -> one input operator Y -> two input operator
>> Z
>>>>>> (NOT ALLOWED as a single chain)
>>>>>> 
>>>>>> The example above sounds to me like c)
>>>>>> 
>>>>>> I think as a follow up, we could allow c), by extend chaining to a
>>>> simple
>>>>>> rule: there can only be a single input selectable operator in the
>> chain
>>>>>> (again, it’s the chaining of multiple input selectable operators
>> that’s
>>>>>> causing some problems).
>>>>>> 
>>>>>>> The table layer has many special features. which give us the chance
>>>> to
>>>>>> optimize
>>>>>>> it, but also results that it is hard to let underlying layer to
>>>>> provide
>>>>>> an abstract
>>>>>>> mechanism to implement it. For example:
>>>>>>> - HashJoin must read all the data on one side(build side) and then
>>>> read
>>>>>> the
>>>>>>> other side (probe side).
>>>>>>> - HashJoin only emit data when read probe side.
>>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>>>> another
>>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>>>> 
>>>>>>> Provide an N-Ary stream operator to make everything possible. The
>>>> upper
>>>>>>> layer can do anything. These things can be specific optimization,
>>>>> which
>>>>>> is much
>>>>>>> more natural than the lower layer.
>>>>>> 
>>>>>> Do you mean that those Table API/SQL use cases
>> (HashJoin/SortMergeJoin)
>>>>>> could be easily handled by a single N-Ary Stream Operator, so this
>>>> would
>>>>> be
>>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>>>>> would
>>>>>> be real nice (avoiding the input selection chaining).
>>>>>> 
>>>>>> Piotrek
>>>>>> 
>>>>>>> On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Piotr,
>>>>>>> 
>>>>>>> Huge +1 for N-Ary Stream Operator.
>>>>>>> And I love this Golden Shovel award very much!
>>>>>>> 
>>>>>>> There are a large number jobs (in production environment) that their
>>>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>>>> ten tasks transmit data through disk and network, which could have
>>>> been
>>>>>>> done in one task.
>>>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>>>> is an order of magnitude in extreme cases.
>>>>>>> 
>>>>>>> The table layer has many special features. which give us the chance
>>>> to
>>>>>> optimize
>>>>>>> it, but also results that it is hard to let underlying layer to
>>>>> provide
>>>>>> an abstract
>>>>>>> mechanism to implement it. For example:
>>>>>>> - HashJoin must read all the data on one side(build side) and then
>>>> read
>>>>>> the
>>>>>>> other side (probe side).
>>>>>>> - HashJoin only emit data when read probe side.
>>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>>>> another
>>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>>>> 
>>>>>>> Provide an N-Ary stream operator to make everything possible. The
>>>> upper
>>>>>>> layer can do anything. These things can be specific optimization,
>>>>> which
>>>>>> is much
>>>>>>> more natural than the lower layer.
>>>>>>> 
>>>>>>> In addition to the two optimizations you mentioned, it also gives
>>>> more
>>>>>> space to
>>>>>>> eliminate virtual function calls:
>>>>>>> Because following this way, the table layer has to consider the
>>>>> operator
>>>>>> chain.
>>>>>>> And in the future, we can optimize a whole N-Ary stream operator to a
>>>>>>> JIT-friendly operator. Without virtual function calls, JIT can play
>>>>> its
>>>>>> real strength.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>> 
>>>>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com
>>>>>> <ma...@ververica.com>> wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> First and foremost I would like to nominate myself to the Golden
>>>> Shovel
>>>>>> award for digging out this topic:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Secondly, I would like to discuss coming back to this particular idea
>>>>> of
>>>>>> implementing N-Ary Stream Operator. This time motivation doesn’t come
>>>>> from
>>>>>> the Side Inputs, but to efficiently support multi joins in SQL,
>> without
>>>>>> extra network exchanges. I’ve reviewed the design doc proposed by
>>>>> Aljoscha,
>>>>>> I quite like it and I think we could start from that.
>>>>>>> 
>>>>>>> Specifically the end-goal is to allow for example Blink, to:
>>>>>>> 
>>>>>>> I. Implement A* multi broadcast join - to have a single operator
>>>> chain,
>>>>>> where probe table (source) is read locally (inside the task that’s is
>>>>>> actually doing the join), then joined with multiple other broadcasted
>>>>>> tables.
>>>>>>> II. Another example might be when we have 2 or more sources,
>>>>>> pre-partitioned on the same key. In that case we should also be able
>> to
>>>>>> perform all of the table reading and the join inside a single Task.
>>>>>>> 
>>>>>>> In order to achieve that, I would propose the following plan:
>>>>>>> 
>>>>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc
>>>> below,
>>>>>> however with added support for the input selection [1].
>>>>>>> - initially it can be just exposed via the `StreamTransformation`,
>>>>>> without direct access from the `DataStream API`
>>>>>>> 
>>>>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27
>>>>>> SourceReader [2])
>>>>>>> 
>>>>>>> 3. Think about whether we need to support more complex chaining.
>>>>> Without
>>>>>> this point, motivating examples (I and II) could be implemented if all
>>>> of
>>>>>> the joins/filtering/mappings are compiled/composed into a single N-Ary
>>>>>> Stream Operator (which could be chained with some other single input
>>>>>> operators at the tail). We could also think about supporting of
>>>> chaining
>>>>> a
>>>>>> tree of for example TwoInputStreamOperators inside a single Task.
>>>> However
>>>>>> I’m leaving this as a follow up, since in that case, it’s not so easy
>>>> to
>>>>>> handle the `InputSelection` of multiple operators inside the tree.
>>>>>>> 
>>>>>>> Piotrek
>>>>>>> 
>>>>>>> [1]
>>>>>> 
>>>>> 
>>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>>>> <
>>>>>> 
>>>>> 
>>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>>>>> 
>>>>>>> [2]
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>> <
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>>>>>>> 
>>>>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org
>>>>>> <ma...@apache.org>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> yes, I see operators of this style as very much an internal thing.
>>>>> You
>>>>>> are probably talking about use cases for OneInputOperator and
>>>>>> TwoInputOperator where users have a very specific need and require
>>>> access
>>>>>> the the low-level details such as watermarks, state and timers to get
>>>>> stuff
>>>>>> done. Maybe we could have a wrapper for these so that users can still
>>>> use
>>>>>> them but internally we wrap them in an N-Ary Operator.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>> 
>>>>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org
>>>> <mailto:
>>>>>> gyfora@apache.org>> wrote:
>>>>>>>>> Hey,
>>>>>>>>> 
>>>>>>>>> Some initial feedback from side:
>>>>>>>>> 
>>>>>>>>> I think this a very important problem to deal with as a lot of
>>>>>> applications
>>>>>>>>> depend on it. I like the proposed runtime model and that is
>>>> probably
>>>>>> the
>>>>>>>>> good way to handle this task, it is very clean what is happening.
>>>>>>>>> 
>>>>>>>>> My main concern is how to handle this from the API and UDFs. What
>>>> you
>>>>>>>>> proposed seems like a very internal thing from the API perspective
>>>>> and
>>>>>> I
>>>>>>>>> would be against exposing it in the way you wrote in your example.
>>>> We
>>>>>>>>> should make all effort to streamline this with the functional style
>>>>>>>>> operators in some way. (so in that sense the way broadcastsets are
>>>>>> handled
>>>>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams
>>>>>>>>> 
>>>>>>>>> But in any case this is awesome initiative :)
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Gyula
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org
>>>>>> 
>>>>>> ezt írta (időpont: 2016. ápr. 21.,
>>>>>>>>> Cs, 15:56):
>>>>>>>>> 
>>>>>>>>>> Hi Team,
>>>>>>>>>> I'm currently thinking about how we can bring the broadcast
>>>>>> set/broadcast
>>>>>>>>>> input feature form the DataSet API to the DataStream API. I think
>>>>> this
>>>>>>>>>> would be a valuable addition since it would enable use cases that
>>>>> join
>>>>>>>>>> streams with constant (or slowly changing) side information.
>>>>>>>>>> 
>>>>>>>>>> For this purpose, I think that we need to change the way we handle
>>>>>> stream
>>>>>>>>>> operators. The new model would have one unified operator that
>>>>> handles
>>>>>> all
>>>>>>>>>> cases and allows to add inputs after the operator was constructed,
>>>>>> thus
>>>>>>>>>> allowing the specification of broadcast inputs.
>>>>>>>>>> 
>>>>>>>>>> I wrote up this preliminary document detailing the reason why we
>>>>> need
>>>>>> such
>>>>>>>>>> a new operator for broadcast inputs and also what the API of such
>>>> an
>>>>>>>>>> operator would be. It also quickly touches on the required changes
>>>>> of
>>>>>>>>>> existing per-operation stream operations such as StreamMap:
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>>> <
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Please have a look if you're interested. Feedback/insights are
>>>> very
>>>>>>>>>> welcome. :-)
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>> 
>>>>>> 
>>>>> 
>>>>> --
>>>>> Best, Jingsong Lee
>>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> Best, Jingsong Lee
>> 
>> 
> 
> -- 
> Best, Jingsong Lee


Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Jingsong Li <ji...@gmail.com>.
Hi Piotr,

Sorry for the misunderstanding, chaining does work with multiple output
right now, I mean, it's also a very important feature, and it should work
with N-ary selectable input operators.
We all think that providing N-ary selectable input operator is a very
important thing, it makes TwoInputOperator chaining possible in upper
layer, and it makes things simpler.

Looking forward to it very much.

Best,
Jingsong Lee

On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work
> with multiple output right now (doesn’t it?), that’s also a good future
> story.
>
> Re Kurt:
> I think this pattern could be easily handled if those two joins are
> implemented as a single 3 input operator, that internally is composed of
> those three operators.
> 1. You can set the initial InputSelection to Build1 and Build2.
> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1
> and Build2.
> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput`
> to the internal `HashAgg` operator
> 4. When Build2 finally receives `endOfInput`, you can finally forward the
> `endOfInput` to the internal `HashAgg`
>
> Exactly for reasons like that, I wanted to at least post pone handling
> tree-like operator chains in the Flink. Logic like that is difficult to
> express generically, since it requires the knowledge about the operators
> behaviour. While when hardcoded for the specific project (Blink in this
> case) and encapsulated behind N-ary selectable input operator, it’s very
> easy to handle by the runtime. Sure, at the expense of a bit more
> complexity in forcing the user to compose operators, that’s why I’m not
> saying that we do not want to handle this at some point in the future, but
> at least not in the first version.
>
> Piotrek
>
> > On 5 Dec 2019, at 10:11, Jingsong Li <ji...@gmail.com> wrote:
> >
> > Kurt mentioned a very interesting thing,
> >
> > If we want to better performance to read simultaneously, To this pattern:
> > We need to control not only the read order of inputs, but also the
> outputs
> > of endInput.
> > In this case, HashAggregate can only call its real endInput after the
> input
> > of build2 is finished, so the endInput of an operator is not necessarily
> > determined by its input, but also by other associated inputs.
> > I think we have the ability to do this in the n-input operator.
> >
> > Note that these behaviors should be determined at compile time.
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <yk...@gmail.com> wrote:
> >
> >> During implementing n-ary input operator in table, please keep
> >> this pattern in mind:
> >>
> >> Build1 ---+
> >>
> >>          |
> >>
> >>          +---> HshJoin1 --—> HashAgg ---+
> >>
> >>          |                              |
> >>
> >> Probe1 ---+                              +---> HashJoin2
> >>
> >>                                         |
> >>
> >>                               Build2 ---+
> >>
> >> It's quite interesting that both `Build1`, `Build2` and `Probe1` can
> >> be read simultaneously. But we need to control `HashAgg`'s output
> >> before `Build2` finished. I don't have a clear solution for now, but
> >> it's a common pattern we will face.
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <ji...@gmail.com>
> wrote:
> >>
> >>> Hi Piotr,
> >>>
> >>>> a) two input operator X -> one input operator Y -> one input operator
> Z
> >>> (ALLOWED)
> >>>> b) n input operator X -> one input operator Y -> one input operator Z
> >>> (ALLOWED)
> >>>> c) two input operator X -> one input operator Y -> two input operator
> Z
> >>> (NOT ALLOWED as a single chain)
> >>>
> >>> NOT ALLOWED to c) sounds good to me. I understand that it is very
> >> difficult
> >>> to propose a general support for any input selectable two input
> operators
> >>> chain with high performance.
> >>> And it is not necessary for table layer too. b) has already excited us.
> >>>
> >>> Actually, we have supported n output chain too:
> >>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
> >>>                                                 -> one op A2 -> one op
> >> B2
> >>> -> one op C2
> >>> d) is a very useful feature too.
> >>>
> >>>> Do you mean that those Table API/SQL use cases
> (HashJoin/SortMergeJoin)
> >>> could be easily handled by a single N-Ary Stream Operator, so this
> would
> >> be
> >>> covered by steps 1. and 2. from my plan from my previous e-mail? That
> >> would
> >>> be real nice (avoiding the input selection chaining).
> >>>
> >>> Yes, because in the table layer, the typical scenarios currently only
> >> have
> >>> static order. (We don't consider MergeJoin here, because it's too
> complex
> >>> to be optimized, and not deserved to be optimized at present.).
> >>> For example, the current TwoInputOperators: HashJoin and
> NestedLoopJoin.
> >>> They are all static reading order. We must read the build input before
> we
> >>> can read the probe input.
> >>> So after we analyze chain, we put all the operators that can chain into
> >> a N
> >>> input operator, We can analyze the static order required by this
> >> operator,
> >>> and divide the reading order into several levels:
> >>> - fist level: input4, input5, input1
> >>> - second level: input2, input6
> >>> - third level: input1, input7
> >>> Note that these analyses are at the compile time of the client.
> >>> At runtime, we just need to read in a fixed order.
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
> >>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
> >>> wrote:
> >>>
> >>>> Hi Jingsong,
> >>>>
> >>>> Thanks for the feedback :)
> >>>>
> >>>> Could you clarify a little bit what do you mean by your wished use
> >> cases?
> >>>>
> >>>>> There are a large number jobs (in production environment) that their
> >>>>> TwoInputOperators that can be chained. We used to only watch the last
> >>>>> ten tasks transmit data through disk and network, which could have
> >> been
> >>>>> done in one task.
> >>>>> For performance, if we can chain them, the average is 30%+, and there
> >>>>> is an order of magnitude in extreme cases.
> >>>>
> >>>> As I mentioned at the end, I would like to avoid/post pone chaining of
> >>>> multiple/two input operators one after another because of the
> >> complexity
> >>> of
> >>>> input selection. For the first version I would like to aim only to
> >> allow
> >>>> chaining the single input operators with something (2 or N input must
> >> be
> >>>> always head of the chain) . For example chains:
> >>>>
> >>>> a) two input operator X -> one input operator Y -> one input operator
> Z
> >>>> (ALLOWED)
> >>>> b) n input operator X -> one input operator Y -> one input operator Z
> >>>> (ALLOWED)
> >>>> c) two input operator X -> one input operator Y -> two input operator
> Z
> >>>> (NOT ALLOWED as a single chain)
> >>>>
> >>>> The example above sounds to me like c)
> >>>>
> >>>> I think as a follow up, we could allow c), by extend chaining to a
> >> simple
> >>>> rule: there can only be a single input selectable operator in the
> chain
> >>>> (again, it’s the chaining of multiple input selectable operators
> that’s
> >>>> causing some problems).
> >>>>
> >>>>> The table layer has many special features. which give us the chance
> >> to
> >>>> optimize
> >>>>> it, but also results that it is hard to let underlying layer to
> >>> provide
> >>>> an abstract
> >>>>> mechanism to implement it. For example:
> >>>>> - HashJoin must read all the data on one side(build side) and then
> >> read
> >>>> the
> >>>>> other side (probe side).
> >>>>> - HashJoin only emit data when read probe side.
> >>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
> >> another
> >>>>> MergeJoin(Sort attribute re-use), that make things complicated.
> >>>>> - HashAggregate/Sort only emit data in endInput.
> >>>>>
> >>>>> Provide an N-Ary stream operator to make everything possible. The
> >> upper
> >>>>> layer can do anything. These things can be specific optimization,
> >>> which
> >>>> is much
> >>>>> more natural than the lower layer.
> >>>>
> >>>> Do you mean that those Table API/SQL use cases
> (HashJoin/SortMergeJoin)
> >>>> could be easily handled by a single N-Ary Stream Operator, so this
> >> would
> >>> be
> >>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
> >>> would
> >>>> be real nice (avoiding the input selection chaining).
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
> >>>>>
> >>>>> Hi Piotr,
> >>>>>
> >>>>> Huge +1 for N-Ary Stream Operator.
> >>>>> And I love this Golden Shovel award very much!
> >>>>>
> >>>>> There are a large number jobs (in production environment) that their
> >>>>> TwoInputOperators that can be chained. We used to only watch the last
> >>>>> ten tasks transmit data through disk and network, which could have
> >> been
> >>>>> done in one task.
> >>>>> For performance, if we can chain them, the average is 30%+, and there
> >>>>> is an order of magnitude in extreme cases.
> >>>>>
> >>>>> The table layer has many special features. which give us the chance
> >> to
> >>>> optimize
> >>>>> it, but also results that it is hard to let underlying layer to
> >>> provide
> >>>> an abstract
> >>>>> mechanism to implement it. For example:
> >>>>> - HashJoin must read all the data on one side(build side) and then
> >> read
> >>>> the
> >>>>> other side (probe side).
> >>>>> - HashJoin only emit data when read probe side.
> >>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
> >> another
> >>>>> MergeJoin(Sort attribute re-use), that make things complicated.
> >>>>> - HashAggregate/Sort only emit data in endInput.
> >>>>>
> >>>>> Provide an N-Ary stream operator to make everything possible. The
> >> upper
> >>>>> layer can do anything. These things can be specific optimization,
> >>> which
> >>>> is much
> >>>>> more natural than the lower layer.
> >>>>>
> >>>>> In addition to the two optimizations you mentioned, it also gives
> >> more
> >>>> space to
> >>>>> eliminate virtual function calls:
> >>>>> Because following this way, the table layer has to consider the
> >>> operator
> >>>> chain.
> >>>>> And in the future, we can optimize a whole N-Ary stream operator to a
> >>>>> JIT-friendly operator. Without virtual function calls, JIT can play
> >>> its
> >>>> real strength.
> >>>>>
> >>>>> Best,
> >>>>> Jingsong Lee
> >>>>>
> >>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com
> >>>> <ma...@ververica.com>> wrote:
> >>>>> Hi,
> >>>>>
> >>>>> First and foremost I would like to nominate myself to the Golden
> >> Shovel
> >>>> award for digging out this topic:
> >>>>>
> >>>>>
> >>>>>
> >>>>> Secondly, I would like to discuss coming back to this particular idea
> >>> of
> >>>> implementing N-Ary Stream Operator. This time motivation doesn’t come
> >>> from
> >>>> the Side Inputs, but to efficiently support multi joins in SQL,
> without
> >>>> extra network exchanges. I’ve reviewed the design doc proposed by
> >>> Aljoscha,
> >>>> I quite like it and I think we could start from that.
> >>>>>
> >>>>> Specifically the end-goal is to allow for example Blink, to:
> >>>>>
> >>>>> I. Implement A* multi broadcast join - to have a single operator
> >> chain,
> >>>> where probe table (source) is read locally (inside the task that’s is
> >>>> actually doing the join), then joined with multiple other broadcasted
> >>>> tables.
> >>>>> II. Another example might be when we have 2 or more sources,
> >>>> pre-partitioned on the same key. In that case we should also be able
> to
> >>>> perform all of the table reading and the join inside a single Task.
> >>>>>
> >>>>> In order to achieve that, I would propose the following plan:
> >>>>>
> >>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc
> >> below,
> >>>> however with added support for the input selection [1].
> >>>>>  - initially it can be just exposed via the `StreamTransformation`,
> >>>> without direct access from the `DataStream API`
> >>>>>
> >>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27
> >>>> SourceReader [2])
> >>>>>
> >>>>> 3. Think about whether we need to support more complex chaining.
> >>> Without
> >>>> this point, motivating examples (I and II) could be implemented if all
> >> of
> >>>> the joins/filtering/mappings are compiled/composed into a single N-Ary
> >>>> Stream Operator (which could be chained with some other single input
> >>>> operators at the tail). We could also think about supporting of
> >> chaining
> >>> a
> >>>> tree of for example TwoInputStreamOperators inside a single Task.
> >> However
> >>>> I’m leaving this as a follow up, since in that case, it’s not so easy
> >> to
> >>>> handle the `InputSelection` of multiple operators inside the tree.
> >>>>>
> >>>>> Piotrek
> >>>>>
> >>>>> [1]
> >>>>
> >>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> >>>> <
> >>>>
> >>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> >>>>>
> >>>>> [2]
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>> <
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >>>>>
> >>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org
> >>>> <ma...@apache.org>> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>> yes, I see operators of this style as very much an internal thing.
> >>> You
> >>>> are probably talking about use cases for OneInputOperator and
> >>>> TwoInputOperator where users have a very specific need and require
> >> access
> >>>> the the low-level details such as watermarks, state and timers to get
> >>> stuff
> >>>> done. Maybe we could have a wrapper for these so that users can still
> >> use
> >>>> them but internally we wrap them in an N-Ary Operator.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org
> >> <mailto:
> >>>> gyfora@apache.org>> wrote:
> >>>>>>> Hey,
> >>>>>>>
> >>>>>>> Some initial feedback from side:
> >>>>>>>
> >>>>>>> I think this a very important problem to deal with as a lot of
> >>>> applications
> >>>>>>> depend on it. I like the proposed runtime model and that is
> >> probably
> >>>> the
> >>>>>>> good way to handle this task, it is very clean what is happening.
> >>>>>>>
> >>>>>>> My main concern is how to handle this from the API and UDFs. What
> >> you
> >>>>>>> proposed seems like a very internal thing from the API perspective
> >>> and
> >>>> I
> >>>>>>> would be against exposing it in the way you wrote in your example.
> >> We
> >>>>>>> should make all effort to streamline this with the functional style
> >>>>>>> operators in some way. (so in that sense the way broadcastsets are
> >>>> handled
> >>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams
> >>>>>>>
> >>>>>>> But in any case this is awesome initiative :)
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Gyula
> >>>>>>>
> >>>>>>>
> >>>>>>> Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org
> >>>>
> >>>> ezt írta (időpont: 2016. ápr. 21.,
> >>>>>>> Cs, 15:56):
> >>>>>>>
> >>>>>>>> Hi Team,
> >>>>>>>> I'm currently thinking about how we can bring the broadcast
> >>>> set/broadcast
> >>>>>>>> input feature form the DataSet API to the DataStream API. I think
> >>> this
> >>>>>>>> would be a valuable addition since it would enable use cases that
> >>> join
> >>>>>>>> streams with constant (or slowly changing) side information.
> >>>>>>>>
> >>>>>>>> For this purpose, I think that we need to change the way we handle
> >>>> stream
> >>>>>>>> operators. The new model would have one unified operator that
> >>> handles
> >>>> all
> >>>>>>>> cases and allows to add inputs after the operator was constructed,
> >>>> thus
> >>>>>>>> allowing the specification of broadcast inputs.
> >>>>>>>>
> >>>>>>>> I wrote up this preliminary document detailing the reason why we
> >>> need
> >>>> such
> >>>>>>>> a new operator for broadcast inputs and also what the API of such
> >> an
> >>>>>>>> operator would be. It also quickly touches on the required changes
> >>> of
> >>>>>>>> existing per-operation stream operations such as StreamMap:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> >>>> <
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> >>>>>
> >>>>>>>>
> >>>>>>>> Please have a look if you're interested. Feedback/insights are
> >> very
> >>>>>>>> welcome. :-)
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best, Jingsong Lee
> >>>>
> >>>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work with multiple output right now (doesn’t it?), that’s also a good future story.

Re Kurt:
I think this pattern could be easily handled if those two joins are implemented as a single 3 input operator, that internally is composed of those three operators.
1. You can set the initial InputSelection to Build1 and Build2.
2. When Build1 receives `endOfInput`, InputSelection switches to Probe1 and Build2.
3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput` to the internal `HashAgg` operator 
4. When Build2 finally receives `endOfInput`, you can finally forward the `endOfInput` to the internal `HashAgg`

Exactly for reasons like that, I wanted to at least post pone handling tree-like operator chains in the Flink. Logic like that is difficult to express generically, since it requires the knowledge about the operators behaviour. While when hardcoded for the specific project (Blink in this case) and encapsulated behind N-ary selectable input operator, it’s very easy to handle by the runtime. Sure, at the expense of a bit more complexity in forcing the user to compose operators, that’s why I’m not saying that we do not want to handle this at some point in the future, but at least not in the first version.

Piotrek

> On 5 Dec 2019, at 10:11, Jingsong Li <ji...@gmail.com> wrote:
> 
> Kurt mentioned a very interesting thing,
> 
> If we want to better performance to read simultaneously, To this pattern:
> We need to control not only the read order of inputs, but also the outputs
> of endInput.
> In this case, HashAggregate can only call its real endInput after the input
> of build2 is finished, so the endInput of an operator is not necessarily
> determined by its input, but also by other associated inputs.
> I think we have the ability to do this in the n-input operator.
> 
> Note that these behaviors should be determined at compile time.
> 
> Best,
> Jingsong Lee
> 
> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <yk...@gmail.com> wrote:
> 
>> During implementing n-ary input operator in table, please keep
>> this pattern in mind:
>> 
>> Build1 ---+
>> 
>>          |
>> 
>>          +---> HshJoin1 --—> HashAgg ---+
>> 
>>          |                              |
>> 
>> Probe1 ---+                              +---> HashJoin2
>> 
>>                                         |
>> 
>>                               Build2 ---+
>> 
>> It's quite interesting that both `Build1`, `Build2` and `Probe1` can
>> be read simultaneously. But we need to control `HashAgg`'s output
>> before `Build2` finished. I don't have a clear solution for now, but
>> it's a common pattern we will face.
>> 
>> Best,
>> Kurt
>> 
>> 
>> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <ji...@gmail.com> wrote:
>> 
>>> Hi Piotr,
>>> 
>>>> a) two input operator X -> one input operator Y -> one input operator Z
>>> (ALLOWED)
>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>> (ALLOWED)
>>>> c) two input operator X -> one input operator Y -> two input operator Z
>>> (NOT ALLOWED as a single chain)
>>> 
>>> NOT ALLOWED to c) sounds good to me. I understand that it is very
>> difficult
>>> to propose a general support for any input selectable two input operators
>>> chain with high performance.
>>> And it is not necessary for table layer too. b) has already excited us.
>>> 
>>> Actually, we have supported n output chain too:
>>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
>>>                                                 -> one op A2 -> one op
>> B2
>>> -> one op C2
>>> d) is a very useful feature too.
>>> 
>>>> Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
>>> could be easily handled by a single N-Ary Stream Operator, so this would
>> be
>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>> would
>>> be real nice (avoiding the input selection chaining).
>>> 
>>> Yes, because in the table layer, the typical scenarios currently only
>> have
>>> static order. (We don't consider MergeJoin here, because it's too complex
>>> to be optimized, and not deserved to be optimized at present.).
>>> For example, the current TwoInputOperators: HashJoin and NestedLoopJoin.
>>> They are all static reading order. We must read the build input before we
>>> can read the probe input.
>>> So after we analyze chain, we put all the operators that can chain into
>> a N
>>> input operator, We can analyze the static order required by this
>> operator,
>>> and divide the reading order into several levels:
>>> - fist level: input4, input5, input1
>>> - second level: input2, input6
>>> - third level: input1, input7
>>> Note that these analyses are at the compile time of the client.
>>> At runtime, we just need to read in a fixed order.
>>> 
>>> Best,
>>> Jingsong Lee
>>> 
>>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>> 
>>>> Hi Jingsong,
>>>> 
>>>> Thanks for the feedback :)
>>>> 
>>>> Could you clarify a little bit what do you mean by your wished use
>> cases?
>>>> 
>>>>> There are a large number jobs (in production environment) that their
>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>> ten tasks transmit data through disk and network, which could have
>> been
>>>>> done in one task.
>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>> is an order of magnitude in extreme cases.
>>>> 
>>>> As I mentioned at the end, I would like to avoid/post pone chaining of
>>>> multiple/two input operators one after another because of the
>> complexity
>>> of
>>>> input selection. For the first version I would like to aim only to
>> allow
>>>> chaining the single input operators with something (2 or N input must
>> be
>>>> always head of the chain) . For example chains:
>>>> 
>>>> a) two input operator X -> one input operator Y -> one input operator Z
>>>> (ALLOWED)
>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>>> (ALLOWED)
>>>> c) two input operator X -> one input operator Y -> two input operator Z
>>>> (NOT ALLOWED as a single chain)
>>>> 
>>>> The example above sounds to me like c)
>>>> 
>>>> I think as a follow up, we could allow c), by extend chaining to a
>> simple
>>>> rule: there can only be a single input selectable operator in the chain
>>>> (again, it’s the chaining of multiple input selectable operators that’s
>>>> causing some problems).
>>>> 
>>>>> The table layer has many special features. which give us the chance
>> to
>>>> optimize
>>>>> it, but also results that it is hard to let underlying layer to
>>> provide
>>>> an abstract
>>>>> mechanism to implement it. For example:
>>>>> - HashJoin must read all the data on one side(build side) and then
>> read
>>>> the
>>>>> other side (probe side).
>>>>> - HashJoin only emit data when read probe side.
>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>> another
>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>> 
>>>>> Provide an N-Ary stream operator to make everything possible. The
>> upper
>>>>> layer can do anything. These things can be specific optimization,
>>> which
>>>> is much
>>>>> more natural than the lower layer.
>>>> 
>>>> Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
>>>> could be easily handled by a single N-Ary Stream Operator, so this
>> would
>>> be
>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>>> would
>>>> be real nice (avoiding the input selection chaining).
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
>>>>> 
>>>>> Hi Piotr,
>>>>> 
>>>>> Huge +1 for N-Ary Stream Operator.
>>>>> And I love this Golden Shovel award very much!
>>>>> 
>>>>> There are a large number jobs (in production environment) that their
>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>> ten tasks transmit data through disk and network, which could have
>> been
>>>>> done in one task.
>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>> is an order of magnitude in extreme cases.
>>>>> 
>>>>> The table layer has many special features. which give us the chance
>> to
>>>> optimize
>>>>> it, but also results that it is hard to let underlying layer to
>>> provide
>>>> an abstract
>>>>> mechanism to implement it. For example:
>>>>> - HashJoin must read all the data on one side(build side) and then
>> read
>>>> the
>>>>> other side (probe side).
>>>>> - HashJoin only emit data when read probe side.
>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>> another
>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>> 
>>>>> Provide an N-Ary stream operator to make everything possible. The
>> upper
>>>>> layer can do anything. These things can be specific optimization,
>>> which
>>>> is much
>>>>> more natural than the lower layer.
>>>>> 
>>>>> In addition to the two optimizations you mentioned, it also gives
>> more
>>>> space to
>>>>> eliminate virtual function calls:
>>>>> Because following this way, the table layer has to consider the
>>> operator
>>>> chain.
>>>>> And in the future, we can optimize a whole N-Ary stream operator to a
>>>>> JIT-friendly operator. Without virtual function calls, JIT can play
>>> its
>>>> real strength.
>>>>> 
>>>>> Best,
>>>>> Jingsong Lee
>>>>> 
>>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com
>>>> <ma...@ververica.com>> wrote:
>>>>> Hi,
>>>>> 
>>>>> First and foremost I would like to nominate myself to the Golden
>> Shovel
>>>> award for digging out this topic:
>>>>> 
>>>>> 
>>>>> 
>>>>> Secondly, I would like to discuss coming back to this particular idea
>>> of
>>>> implementing N-Ary Stream Operator. This time motivation doesn’t come
>>> from
>>>> the Side Inputs, but to efficiently support multi joins in SQL, without
>>>> extra network exchanges. I’ve reviewed the design doc proposed by
>>> Aljoscha,
>>>> I quite like it and I think we could start from that.
>>>>> 
>>>>> Specifically the end-goal is to allow for example Blink, to:
>>>>> 
>>>>> I. Implement A* multi broadcast join - to have a single operator
>> chain,
>>>> where probe table (source) is read locally (inside the task that’s is
>>>> actually doing the join), then joined with multiple other broadcasted
>>>> tables.
>>>>> II. Another example might be when we have 2 or more sources,
>>>> pre-partitioned on the same key. In that case we should also be able to
>>>> perform all of the table reading and the join inside a single Task.
>>>>> 
>>>>> In order to achieve that, I would propose the following plan:
>>>>> 
>>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc
>> below,
>>>> however with added support for the input selection [1].
>>>>>  - initially it can be just exposed via the `StreamTransformation`,
>>>> without direct access from the `DataStream API`
>>>>> 
>>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27
>>>> SourceReader [2])
>>>>> 
>>>>> 3. Think about whether we need to support more complex chaining.
>>> Without
>>>> this point, motivating examples (I and II) could be implemented if all
>> of
>>>> the joins/filtering/mappings are compiled/composed into a single N-Ary
>>>> Stream Operator (which could be chained with some other single input
>>>> operators at the tail). We could also think about supporting of
>> chaining
>>> a
>>>> tree of for example TwoInputStreamOperators inside a single Task.
>> However
>>>> I’m leaving this as a follow up, since in that case, it’s not so easy
>> to
>>>> handle the `InputSelection` of multiple operators inside the tree.
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>> [1]
>>>> 
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>> <
>>>> 
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>>> 
>>>>> [2]
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>> <
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>>>>> 
>>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org
>>>> <ma...@apache.org>> wrote:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> yes, I see operators of this style as very much an internal thing.
>>> You
>>>> are probably talking about use cases for OneInputOperator and
>>>> TwoInputOperator where users have a very specific need and require
>> access
>>>> the the low-level details such as watermarks, state and timers to get
>>> stuff
>>>> done. Maybe we could have a wrapper for these so that users can still
>> use
>>>> them but internally we wrap them in an N-Ary Operator.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>> 
>>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org
>> <mailto:
>>>> gyfora@apache.org>> wrote:
>>>>>>> Hey,
>>>>>>> 
>>>>>>> Some initial feedback from side:
>>>>>>> 
>>>>>>> I think this a very important problem to deal with as a lot of
>>>> applications
>>>>>>> depend on it. I like the proposed runtime model and that is
>> probably
>>>> the
>>>>>>> good way to handle this task, it is very clean what is happening.
>>>>>>> 
>>>>>>> My main concern is how to handle this from the API and UDFs. What
>> you
>>>>>>> proposed seems like a very internal thing from the API perspective
>>> and
>>>> I
>>>>>>> would be against exposing it in the way you wrote in your example.
>> We
>>>>>>> should make all effort to streamline this with the functional style
>>>>>>> operators in some way. (so in that sense the way broadcastsets are
>>>> handled
>>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams
>>>>>>> 
>>>>>>> But in any case this is awesome initiative :)
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Gyula
>>>>>>> 
>>>>>>> 
>>>>>>> Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org
>>>> 
>>>> ezt írta (időpont: 2016. ápr. 21.,
>>>>>>> Cs, 15:56):
>>>>>>> 
>>>>>>>> Hi Team,
>>>>>>>> I'm currently thinking about how we can bring the broadcast
>>>> set/broadcast
>>>>>>>> input feature form the DataSet API to the DataStream API. I think
>>> this
>>>>>>>> would be a valuable addition since it would enable use cases that
>>> join
>>>>>>>> streams with constant (or slowly changing) side information.
>>>>>>>> 
>>>>>>>> For this purpose, I think that we need to change the way we handle
>>>> stream
>>>>>>>> operators. The new model would have one unified operator that
>>> handles
>>>> all
>>>>>>>> cases and allows to add inputs after the operator was constructed,
>>>> thus
>>>>>>>> allowing the specification of broadcast inputs.
>>>>>>>> 
>>>>>>>> I wrote up this preliminary document detailing the reason why we
>>> need
>>>> such
>>>>>>>> a new operator for broadcast inputs and also what the API of such
>> an
>>>>>>>> operator would be. It also quickly touches on the required changes
>>> of
>>>>>>>> existing per-operation stream operations such as StreamMap:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>> <
>>>> 
>>> 
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>> 
>>>>>>>> 
>>>>>>>> Please have a look if you're interested. Feedback/insights are
>> very
>>>>>>>> welcome. :-)
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Best, Jingsong Lee
>>>> 
>>>> 
>>> 
>>> --
>>> Best, Jingsong Lee
>>> 
>> 
> 
> 
> -- 
> Best, Jingsong Lee


Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Jingsong Li <ji...@gmail.com>.
Kurt mentioned a very interesting thing,

If we want to better performance to read simultaneously, To this pattern:
We need to control not only the read order of inputs, but also the outputs
of endInput.
In this case, HashAggregate can only call its real endInput after the input
of build2 is finished, so the endInput of an operator is not necessarily
determined by its input, but also by other associated inputs.
I think we have the ability to do this in the n-input operator.

Note that these behaviors should be determined at compile time.

Best,
Jingsong Lee

On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <yk...@gmail.com> wrote:

> During implementing n-ary input operator in table, please keep
> this pattern in mind:
>
> Build1 ---+
>
>           |
>
>           +---> HshJoin1 --—> HashAgg ---+
>
>           |                              |
>
> Probe1 ---+                              +---> HashJoin2
>
>                                          |
>
>                                Build2 ---+
>
> It's quite interesting that both `Build1`, `Build2` and `Probe1` can
> be read simultaneously. But we need to control `HashAgg`'s output
> before `Build2` finished. I don't have a clear solution for now, but
> it's a common pattern we will face.
>
> Best,
> Kurt
>
>
> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <ji...@gmail.com> wrote:
>
> > Hi Piotr,
> >
> > > a) two input operator X -> one input operator Y -> one input operator Z
> > (ALLOWED)
> > > b) n input operator X -> one input operator Y -> one input operator Z
> > (ALLOWED)
> > > c) two input operator X -> one input operator Y -> two input operator Z
> > (NOT ALLOWED as a single chain)
> >
> > NOT ALLOWED to c) sounds good to me. I understand that it is very
> difficult
> > to propose a general support for any input selectable two input operators
> > chain with high performance.
> > And it is not necessary for table layer too. b) has already excited us.
> >
> > Actually, we have supported n output chain too:
> > d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
> >                                                  -> one op A2 -> one op
> B2
> > -> one op C2
> > d) is a very useful feature too.
> >
> > > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> > could be easily handled by a single N-Ary Stream Operator, so this would
> be
> > covered by steps 1. and 2. from my plan from my previous e-mail? That
> would
> > be real nice (avoiding the input selection chaining).
> >
> > Yes, because in the table layer, the typical scenarios currently only
> have
> > static order. (We don't consider MergeJoin here, because it's too complex
> > to be optimized, and not deserved to be optimized at present.).
> > For example, the current TwoInputOperators: HashJoin and NestedLoopJoin.
> > They are all static reading order. We must read the build input before we
> > can read the probe input.
> > So after we analyze chain, we put all the operators that can chain into
> a N
> > input operator, We can analyze the static order required by this
> operator,
> > and divide the reading order into several levels:
> > - fist level: input4, input5, input1
> > - second level: input2, input6
> > - third level: input1, input7
> > Note that these analyses are at the compile time of the client.
> > At runtime, we just need to read in a fixed order.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
> > wrote:
> >
> > > Hi Jingsong,
> > >
> > > Thanks for the feedback :)
> > >
> > > Could you clarify a little bit what do you mean by your wished use
> cases?
> > >
> > > > There are a large number jobs (in production environment) that their
> > > > TwoInputOperators that can be chained. We used to only watch the last
> > > > ten tasks transmit data through disk and network, which could have
> been
> > > >  done in one task.
> > > > For performance, if we can chain them, the average is 30%+, and there
> > > >  is an order of magnitude in extreme cases.
> > >
> > > As I mentioned at the end, I would like to avoid/post pone chaining of
> > > multiple/two input operators one after another because of the
> complexity
> > of
> > > input selection. For the first version I would like to aim only to
> allow
> > > chaining the single input operators with something (2 or N input must
> be
> > > always head of the chain) . For example chains:
> > >
> > > a) two input operator X -> one input operator Y -> one input operator Z
> > > (ALLOWED)
> > > b) n input operator X -> one input operator Y -> one input operator Z
> > > (ALLOWED)
> > > c) two input operator X -> one input operator Y -> two input operator Z
> > > (NOT ALLOWED as a single chain)
> > >
> > > The example above sounds to me like c)
> > >
> > > I think as a follow up, we could allow c), by extend chaining to a
> simple
> > > rule: there can only be a single input selectable operator in the chain
> > > (again, it’s the chaining of multiple input selectable operators that’s
> > > causing some problems).
> > >
> > > > The table layer has many special features. which give us the chance
> to
> > > optimize
> > > >  it, but also results that it is hard to let underlying layer to
> > provide
> > > an abstract
> > > > mechanism to implement it. For example:
> > > > - HashJoin must read all the data on one side(build side) and then
> read
> > > the
> > > > other side (probe side).
> > > > - HashJoin only emit data when read probe side.
> > > > - SortMergeJoin read random, but if we have SortMergeJoin chain
> another
> > > >  MergeJoin(Sort attribute re-use), that make things complicated.
> > > > - HashAggregate/Sort only emit data in endInput.
> > > >
> > > > Provide an N-Ary stream operator to make everything possible. The
> upper
> > > >  layer can do anything. These things can be specific optimization,
> > which
> > > is much
> > > >  more natural than the lower layer.
> > >
> > > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> > > could be easily handled by a single N-Ary Stream Operator, so this
> would
> > be
> > > covered by steps 1. and 2. from my plan from my previous e-mail? That
> > would
> > > be real nice (avoiding the input selection chaining).
> > >
> > > Piotrek
> > >
> > > > On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
> > > >
> > > > Hi Piotr,
> > > >
> > > > Huge +1 for N-Ary Stream Operator.
> > > > And I love this Golden Shovel award very much!
> > > >
> > > > There are a large number jobs (in production environment) that their
> > > > TwoInputOperators that can be chained. We used to only watch the last
> > > > ten tasks transmit data through disk and network, which could have
> been
> > > >  done in one task.
> > > > For performance, if we can chain them, the average is 30%+, and there
> > > >  is an order of magnitude in extreme cases.
> > > >
> > > > The table layer has many special features. which give us the chance
> to
> > > optimize
> > > >  it, but also results that it is hard to let underlying layer to
> > provide
> > > an abstract
> > > > mechanism to implement it. For example:
> > > > - HashJoin must read all the data on one side(build side) and then
> read
> > > the
> > > > other side (probe side).
> > > > - HashJoin only emit data when read probe side.
> > > > - SortMergeJoin read random, but if we have SortMergeJoin chain
> another
> > > >  MergeJoin(Sort attribute re-use), that make things complicated.
> > > > - HashAggregate/Sort only emit data in endInput.
> > > >
> > > > Provide an N-Ary stream operator to make everything possible. The
> upper
> > > >  layer can do anything. These things can be specific optimization,
> > which
> > > is much
> > > >  more natural than the lower layer.
> > > >
> > > > In addition to the two optimizations you mentioned, it also gives
> more
> > > space to
> > > >  eliminate virtual function calls:
> > > > Because following this way, the table layer has to consider the
> > operator
> > > chain.
> > > > And in the future, we can optimize a whole N-Ary stream operator to a
> > > >  JIT-friendly operator. Without virtual function calls, JIT can play
> > its
> > > real strength.
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com
> > > <ma...@ververica.com>> wrote:
> > > > Hi,
> > > >
> > > > First and foremost I would like to nominate myself to the Golden
> Shovel
> > > award for digging out this topic:
> > > >
> > > >
> > > >
> > > > Secondly, I would like to discuss coming back to this particular idea
> > of
> > > implementing N-Ary Stream Operator. This time motivation doesn’t come
> > from
> > > the Side Inputs, but to efficiently support multi joins in SQL, without
> > > extra network exchanges. I’ve reviewed the design doc proposed by
> > Aljoscha,
> > > I quite like it and I think we could start from that.
> > > >
> > > > Specifically the end-goal is to allow for example Blink, to:
> > > >
> > > > I. Implement A* multi broadcast join - to have a single operator
> chain,
> > > where probe table (source) is read locally (inside the task that’s is
> > > actually doing the join), then joined with multiple other broadcasted
> > > tables.
> > > > II. Another example might be when we have 2 or more sources,
> > > pre-partitioned on the same key. In that case we should also be able to
> > > perform all of the table reading and the join inside a single Task.
> > > >
> > > > In order to achieve that, I would propose the following plan:
> > > >
> > > > 1. Implement N-Ary Stream Operator as proposed in the design doc
> below,
> > > however with added support for the input selection [1].
> > > >   - initially it can be just exposed via the `StreamTransformation`,
> > > without direct access from the `DataStream API`
> > > >
> > > > 2. Allow it to be chained with sources (implemented using the FLIP-27
> > > SourceReader [2])
> > > >
> > > > 3. Think about whether we need to support more complex chaining.
> > Without
> > > this point, motivating examples (I and II) could be implemented if all
> of
> > > the joins/filtering/mappings are compiled/composed into a single N-Ary
> > > Stream Operator (which could be chained with some other single input
> > > operators at the tail). We could also think about supporting of
> chaining
> > a
> > > tree of for example TwoInputStreamOperators inside a single Task.
> However
> > > I’m leaving this as a follow up, since in that case, it’s not so easy
> to
> > > handle the `InputSelection` of multiple operators inside the tree.
> > > >
> > > > Piotrek
> > > >
> > > > [1]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> > > >
> > > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > > >
> > > >>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org
> > > <ma...@apache.org>> wrote:
> > > >>>
> > > >>> Hi,
> > > >>> yes, I see operators of this style as very much an internal thing.
> > You
> > > are probably talking about use cases for OneInputOperator and
> > > TwoInputOperator where users have a very specific need and require
> access
> > > the the low-level details such as watermarks, state and timers to get
> > stuff
> > > done. Maybe we could have a wrapper for these so that users can still
> use
> > > them but internally we wrap them in an N-Ary Operator.
> > > >>>
> > > >>> Cheers,
> > > >>> Aljoscha
> > > >>>
> > > >>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org
> <mailto:
> > > gyfora@apache.org>> wrote:
> > > >>> Hey,
> > > >>>
> > > >>> Some initial feedback from side:
> > > >>>
> > > >>> I think this a very important problem to deal with as a lot of
> > > applications
> > > >>> depend on it. I like the proposed runtime model and that is
> probably
> > > the
> > > >>> good way to handle this task, it is very clean what is happening.
> > > >>>
> > > >>> My main concern is how to handle this from the API and UDFs. What
> you
> > > >>> proposed seems like a very internal thing from the API perspective
> > and
> > > I
> > > >>> would be against exposing it in the way you wrote in your example.
> We
> > > >>> should make all effort to streamline this with the functional style
> > > >>> operators in some way. (so in that sense the way broadcastsets are
> > > handled
> > > >>> is pretty nice) Maybe we could extend ds.connect() to many streams
> > > >>>
> > > >>> But in any case this is awesome initiative :)
> > > >>>
> > > >>> Cheers,
> > > >>> Gyula
> > > >>>
> > > >>>
> > > >>> Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org
> >>
> > > ezt írta (időpont: 2016. ápr. 21.,
> > > >>> Cs, 15:56):
> > > >>>
> > > >>>> Hi Team,
> > > >>>> I'm currently thinking about how we can bring the broadcast
> > > set/broadcast
> > > >>>> input feature form the DataSet API to the DataStream API. I think
> > this
> > > >>>> would be a valuable addition since it would enable use cases that
> > join
> > > >>>> streams with constant (or slowly changing) side information.
> > > >>>>
> > > >>>> For this purpose, I think that we need to change the way we handle
> > > stream
> > > >>>> operators. The new model would have one unified operator that
> > handles
> > > all
> > > >>>> cases and allows to add inputs after the operator was constructed,
> > > thus
> > > >>>> allowing the specification of broadcast inputs.
> > > >>>>
> > > >>>> I wrote up this preliminary document detailing the reason why we
> > need
> > > such
> > > >>>> a new operator for broadcast inputs and also what the API of such
> an
> > > >>>> operator would be. It also quickly touches on the required changes
> > of
> > > >>>> existing per-operation stream operations such as StreamMap:
> > > >>>>
> > > >>>>
> > > >>>>
> > >
> >
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> > > <
> > >
> >
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> > > >
> > > >>>>
> > > >>>> Please have a look if you're interested. Feedback/insights are
> very
> > > >>>> welcome. :-)
> > > >>>>
> > > >>>> Cheers,
> > > >>>> Aljoscha
> > > >>>>
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > >
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee

Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Kurt Young <yk...@gmail.com>.
During implementing n-ary input operator in table, please keep
this pattern in mind:

Build1 ---+

          |

          +---> HshJoin1 --—> HashAgg ---+

          |                              |

Probe1 ---+                              +---> HashJoin2

                                         |

                               Build2 ---+

It's quite interesting that both `Build1`, `Build2` and `Probe1` can
be read simultaneously. But we need to control `HashAgg`'s output
before `Build2` finished. I don't have a clear solution for now, but
it's a common pattern we will face.

Best,
Kurt


On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <ji...@gmail.com> wrote:

> Hi Piotr,
>
> > a) two input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> > b) n input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> > c) two input operator X -> one input operator Y -> two input operator Z
> (NOT ALLOWED as a single chain)
>
> NOT ALLOWED to c) sounds good to me. I understand that it is very difficult
> to propose a general support for any input selectable two input operators
> chain with high performance.
> And it is not necessary for table layer too. b) has already excited us.
>
> Actually, we have supported n output chain too:
> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
>                                                  -> one op A2 -> one op B2
> -> one op C2
> d) is a very useful feature too.
>
> > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> could be easily handled by a single N-Ary Stream Operator, so this would be
> covered by steps 1. and 2. from my plan from my previous e-mail? That would
> be real nice (avoiding the input selection chaining).
>
> Yes, because in the table layer, the typical scenarios currently only have
> static order. (We don't consider MergeJoin here, because it's too complex
> to be optimized, and not deserved to be optimized at present.).
> For example, the current TwoInputOperators: HashJoin and NestedLoopJoin.
> They are all static reading order. We must read the build input before we
> can read the probe input.
> So after we analyze chain, we put all the operators that can chain into a N
> input operator, We can analyze the static order required by this operator,
> and divide the reading order into several levels:
> - fist level: input4, input5, input1
> - second level: input2, input6
> - third level: input1, input7
> Note that these analyses are at the compile time of the client.
> At runtime, we just need to read in a fixed order.
>
> Best,
> Jingsong Lee
>
> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
> > Hi Jingsong,
> >
> > Thanks for the feedback :)
> >
> > Could you clarify a little bit what do you mean by your wished use cases?
> >
> > > There are a large number jobs (in production environment) that their
> > > TwoInputOperators that can be chained. We used to only watch the last
> > > ten tasks transmit data through disk and network, which could have been
> > >  done in one task.
> > > For performance, if we can chain them, the average is 30%+, and there
> > >  is an order of magnitude in extreme cases.
> >
> > As I mentioned at the end, I would like to avoid/post pone chaining of
> > multiple/two input operators one after another because of the complexity
> of
> > input selection. For the first version I would like to aim only to allow
> > chaining the single input operators with something (2 or N input must be
> > always head of the chain) . For example chains:
> >
> > a) two input operator X -> one input operator Y -> one input operator Z
> > (ALLOWED)
> > b) n input operator X -> one input operator Y -> one input operator Z
> > (ALLOWED)
> > c) two input operator X -> one input operator Y -> two input operator Z
> > (NOT ALLOWED as a single chain)
> >
> > The example above sounds to me like c)
> >
> > I think as a follow up, we could allow c), by extend chaining to a simple
> > rule: there can only be a single input selectable operator in the chain
> > (again, it’s the chaining of multiple input selectable operators that’s
> > causing some problems).
> >
> > > The table layer has many special features. which give us the chance to
> > optimize
> > >  it, but also results that it is hard to let underlying layer to
> provide
> > an abstract
> > > mechanism to implement it. For example:
> > > - HashJoin must read all the data on one side(build side) and then read
> > the
> > > other side (probe side).
> > > - HashJoin only emit data when read probe side.
> > > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> > >  MergeJoin(Sort attribute re-use), that make things complicated.
> > > - HashAggregate/Sort only emit data in endInput.
> > >
> > > Provide an N-Ary stream operator to make everything possible. The upper
> > >  layer can do anything. These things can be specific optimization,
> which
> > is much
> > >  more natural than the lower layer.
> >
> > Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> > could be easily handled by a single N-Ary Stream Operator, so this would
> be
> > covered by steps 1. and 2. from my plan from my previous e-mail? That
> would
> > be real nice (avoiding the input selection chaining).
> >
> > Piotrek
> >
> > > On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
> > >
> > > Hi Piotr,
> > >
> > > Huge +1 for N-Ary Stream Operator.
> > > And I love this Golden Shovel award very much!
> > >
> > > There are a large number jobs (in production environment) that their
> > > TwoInputOperators that can be chained. We used to only watch the last
> > > ten tasks transmit data through disk and network, which could have been
> > >  done in one task.
> > > For performance, if we can chain them, the average is 30%+, and there
> > >  is an order of magnitude in extreme cases.
> > >
> > > The table layer has many special features. which give us the chance to
> > optimize
> > >  it, but also results that it is hard to let underlying layer to
> provide
> > an abstract
> > > mechanism to implement it. For example:
> > > - HashJoin must read all the data on one side(build side) and then read
> > the
> > > other side (probe side).
> > > - HashJoin only emit data when read probe side.
> > > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> > >  MergeJoin(Sort attribute re-use), that make things complicated.
> > > - HashAggregate/Sort only emit data in endInput.
> > >
> > > Provide an N-Ary stream operator to make everything possible. The upper
> > >  layer can do anything. These things can be specific optimization,
> which
> > is much
> > >  more natural than the lower layer.
> > >
> > > In addition to the two optimizations you mentioned, it also gives more
> > space to
> > >  eliminate virtual function calls:
> > > Because following this way, the table layer has to consider the
> operator
> > chain.
> > > And in the future, we can optimize a whole N-Ary stream operator to a
> > >  JIT-friendly operator. Without virtual function calls, JIT can play
> its
> > real strength.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com
> > <ma...@ververica.com>> wrote:
> > > Hi,
> > >
> > > First and foremost I would like to nominate myself to the Golden Shovel
> > award for digging out this topic:
> > >
> > >
> > >
> > > Secondly, I would like to discuss coming back to this particular idea
> of
> > implementing N-Ary Stream Operator. This time motivation doesn’t come
> from
> > the Side Inputs, but to efficiently support multi joins in SQL, without
> > extra network exchanges. I’ve reviewed the design doc proposed by
> Aljoscha,
> > I quite like it and I think we could start from that.
> > >
> > > Specifically the end-goal is to allow for example Blink, to:
> > >
> > > I. Implement A* multi broadcast join - to have a single operator chain,
> > where probe table (source) is read locally (inside the task that’s is
> > actually doing the join), then joined with multiple other broadcasted
> > tables.
> > > II. Another example might be when we have 2 or more sources,
> > pre-partitioned on the same key. In that case we should also be able to
> > perform all of the table reading and the join inside a single Task.
> > >
> > > In order to achieve that, I would propose the following plan:
> > >
> > > 1. Implement N-Ary Stream Operator as proposed in the design doc below,
> > however with added support for the input selection [1].
> > >   - initially it can be just exposed via the `StreamTransformation`,
> > without direct access from the `DataStream API`
> > >
> > > 2. Allow it to be chained with sources (implemented using the FLIP-27
> > SourceReader [2])
> > >
> > > 3. Think about whether we need to support more complex chaining.
> Without
> > this point, motivating examples (I and II) could be implemented if all of
> > the joins/filtering/mappings are compiled/composed into a single N-Ary
> > Stream Operator (which could be chained with some other single input
> > operators at the tail). We could also think about supporting of chaining
> a
> > tree of for example TwoInputStreamOperators inside a single Task. However
> > I’m leaving this as a follow up, since in that case, it’s not so easy to
> > handle the `InputSelection` of multiple operators inside the tree.
> > >
> > > Piotrek
> > >
> > > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> > >
> > > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > >
> > >>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org
> > <ma...@apache.org>> wrote:
> > >>>
> > >>> Hi,
> > >>> yes, I see operators of this style as very much an internal thing.
> You
> > are probably talking about use cases for OneInputOperator and
> > TwoInputOperator where users have a very specific need and require access
> > the the low-level details such as watermarks, state and timers to get
> stuff
> > done. Maybe we could have a wrapper for these so that users can still use
> > them but internally we wrap them in an N-Ary Operator.
> > >>>
> > >>> Cheers,
> > >>> Aljoscha
> > >>>
> > >>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org <mailto:
> > gyfora@apache.org>> wrote:
> > >>> Hey,
> > >>>
> > >>> Some initial feedback from side:
> > >>>
> > >>> I think this a very important problem to deal with as a lot of
> > applications
> > >>> depend on it. I like the proposed runtime model and that is probably
> > the
> > >>> good way to handle this task, it is very clean what is happening.
> > >>>
> > >>> My main concern is how to handle this from the API and UDFs. What you
> > >>> proposed seems like a very internal thing from the API perspective
> and
> > I
> > >>> would be against exposing it in the way you wrote in your example. We
> > >>> should make all effort to streamline this with the functional style
> > >>> operators in some way. (so in that sense the way broadcastsets are
> > handled
> > >>> is pretty nice) Maybe we could extend ds.connect() to many streams
> > >>>
> > >>> But in any case this is awesome initiative :)
> > >>>
> > >>> Cheers,
> > >>> Gyula
> > >>>
> > >>>
> > >>> Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>>
> > ezt írta (időpont: 2016. ápr. 21.,
> > >>> Cs, 15:56):
> > >>>
> > >>>> Hi Team,
> > >>>> I'm currently thinking about how we can bring the broadcast
> > set/broadcast
> > >>>> input feature form the DataSet API to the DataStream API. I think
> this
> > >>>> would be a valuable addition since it would enable use cases that
> join
> > >>>> streams with constant (or slowly changing) side information.
> > >>>>
> > >>>> For this purpose, I think that we need to change the way we handle
> > stream
> > >>>> operators. The new model would have one unified operator that
> handles
> > all
> > >>>> cases and allows to add inputs after the operator was constructed,
> > thus
> > >>>> allowing the specification of broadcast inputs.
> > >>>>
> > >>>> I wrote up this preliminary document detailing the reason why we
> need
> > such
> > >>>> a new operator for broadcast inputs and also what the API of such an
> > >>>> operator would be. It also quickly touches on the required changes
> of
> > >>>> existing per-operation stream operations such as StreamMap:
> > >>>>
> > >>>>
> > >>>>
> >
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> > <
> >
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> > >
> > >>>>
> > >>>> Please have a look if you're interested. Feedback/insights are very
> > >>>> welcome. :-)
> > >>>>
> > >>>> Cheers,
> > >>>> Aljoscha
> > >>>>
> > >>
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> >
> >
>
> --
> Best, Jingsong Lee
>

Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Jingsong Li <ji...@gmail.com>.
Hi Piotr,

> a) two input operator X -> one input operator Y -> one input operator Z
(ALLOWED)
> b) n input operator X -> one input operator Y -> one input operator Z
(ALLOWED)
> c) two input operator X -> one input operator Y -> two input operator Z
(NOT ALLOWED as a single chain)

NOT ALLOWED to c) sounds good to me. I understand that it is very difficult
to propose a general support for any input selectable two input operators
chain with high performance.
And it is not necessary for table layer too. b) has already excited us.

Actually, we have supported n output chain too:
d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
                                                 -> one op A2 -> one op B2
-> one op C2
d) is a very useful feature too.

> Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
could be easily handled by a single N-Ary Stream Operator, so this would be
covered by steps 1. and 2. from my plan from my previous e-mail? That would
be real nice (avoiding the input selection chaining).

Yes, because in the table layer, the typical scenarios currently only have
static order. (We don't consider MergeJoin here, because it's too complex
to be optimized, and not deserved to be optimized at present.).
For example, the current TwoInputOperators: HashJoin and NestedLoopJoin.
They are all static reading order. We must read the build input before we
can read the probe input.
So after we analyze chain, we put all the operators that can chain into a N
input operator, We can analyze the static order required by this operator,
and divide the reading order into several levels:
- fist level: input4, input5, input1
- second level: input2, input6
- third level: input1, input7
Note that these analyses are at the compile time of the client.
At runtime, we just need to read in a fixed order.

Best,
Jingsong Lee

On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Jingsong,
>
> Thanks for the feedback :)
>
> Could you clarify a little bit what do you mean by your wished use cases?
>
> > There are a large number jobs (in production environment) that their
> > TwoInputOperators that can be chained. We used to only watch the last
> > ten tasks transmit data through disk and network, which could have been
> >  done in one task.
> > For performance, if we can chain them, the average is 30%+, and there
> >  is an order of magnitude in extreme cases.
>
> As I mentioned at the end, I would like to avoid/post pone chaining of
> multiple/two input operators one after another because of the complexity of
> input selection. For the first version I would like to aim only to allow
> chaining the single input operators with something (2 or N input must be
> always head of the chain) . For example chains:
>
> a) two input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> b) n input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> c) two input operator X -> one input operator Y -> two input operator Z
> (NOT ALLOWED as a single chain)
>
> The example above sounds to me like c)
>
> I think as a follow up, we could allow c), by extend chaining to a simple
> rule: there can only be a single input selectable operator in the chain
> (again, it’s the chaining of multiple input selectable operators that’s
> causing some problems).
>
> > The table layer has many special features. which give us the chance to
> optimize
> >  it, but also results that it is hard to let underlying layer to provide
> an abstract
> > mechanism to implement it. For example:
> > - HashJoin must read all the data on one side(build side) and then read
> the
> > other side (probe side).
> > - HashJoin only emit data when read probe side.
> > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> >  MergeJoin(Sort attribute re-use), that make things complicated.
> > - HashAggregate/Sort only emit data in endInput.
> >
> > Provide an N-Ary stream operator to make everything possible. The upper
> >  layer can do anything. These things can be specific optimization, which
> is much
> >  more natural than the lower layer.
>
> Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> could be easily handled by a single N-Ary Stream Operator, so this would be
> covered by steps 1. and 2. from my plan from my previous e-mail? That would
> be real nice (avoiding the input selection chaining).
>
> Piotrek
>
> > On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
> >
> > Hi Piotr,
> >
> > Huge +1 for N-Ary Stream Operator.
> > And I love this Golden Shovel award very much!
> >
> > There are a large number jobs (in production environment) that their
> > TwoInputOperators that can be chained. We used to only watch the last
> > ten tasks transmit data through disk and network, which could have been
> >  done in one task.
> > For performance, if we can chain them, the average is 30%+, and there
> >  is an order of magnitude in extreme cases.
> >
> > The table layer has many special features. which give us the chance to
> optimize
> >  it, but also results that it is hard to let underlying layer to provide
> an abstract
> > mechanism to implement it. For example:
> > - HashJoin must read all the data on one side(build side) and then read
> the
> > other side (probe side).
> > - HashJoin only emit data when read probe side.
> > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> >  MergeJoin(Sort attribute re-use), that make things complicated.
> > - HashAggregate/Sort only emit data in endInput.
> >
> > Provide an N-Ary stream operator to make everything possible. The upper
> >  layer can do anything. These things can be specific optimization, which
> is much
> >  more natural than the lower layer.
> >
> > In addition to the two optimizations you mentioned, it also gives more
> space to
> >  eliminate virtual function calls:
> > Because following this way, the table layer has to consider the operator
> chain.
> > And in the future, we can optimize a whole N-Ary stream operator to a
> >  JIT-friendly operator. Without virtual function calls, JIT can play its
> real strength.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com
> <ma...@ververica.com>> wrote:
> > Hi,
> >
> > First and foremost I would like to nominate myself to the Golden Shovel
> award for digging out this topic:
> >
> >
> >
> > Secondly, I would like to discuss coming back to this particular idea of
> implementing N-Ary Stream Operator. This time motivation doesn’t come from
> the Side Inputs, but to efficiently support multi joins in SQL, without
> extra network exchanges. I’ve reviewed the design doc proposed by Aljoscha,
> I quite like it and I think we could start from that.
> >
> > Specifically the end-goal is to allow for example Blink, to:
> >
> > I. Implement A* multi broadcast join - to have a single operator chain,
> where probe table (source) is read locally (inside the task that’s is
> actually doing the join), then joined with multiple other broadcasted
> tables.
> > II. Another example might be when we have 2 or more sources,
> pre-partitioned on the same key. In that case we should also be able to
> perform all of the table reading and the join inside a single Task.
> >
> > In order to achieve that, I would propose the following plan:
> >
> > 1. Implement N-Ary Stream Operator as proposed in the design doc below,
> however with added support for the input selection [1].
> >   - initially it can be just exposed via the `StreamTransformation`,
> without direct access from the `DataStream API`
> >
> > 2. Allow it to be chained with sources (implemented using the FLIP-27
> SourceReader [2])
> >
> > 3. Think about whether we need to support more complex chaining. Without
> this point, motivating examples (I and II) could be implemented if all of
> the joins/filtering/mappings are compiled/composed into a single N-Ary
> Stream Operator (which could be chained with some other single input
> operators at the tail). We could also think about supporting of chaining a
> tree of for example TwoInputStreamOperators inside a single Task. However
> I’m leaving this as a follow up, since in that case, it’s not so easy to
> handle the `InputSelection` of multiple operators inside the tree.
> >
> > Piotrek
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> >
> > [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >
> >>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org
> <ma...@apache.org>> wrote:
> >>>
> >>> Hi,
> >>> yes, I see operators of this style as very much an internal thing. You
> are probably talking about use cases for OneInputOperator and
> TwoInputOperator where users have a very specific need and require access
> the the low-level details such as watermarks, state and timers to get stuff
> done. Maybe we could have a wrapper for these so that users can still use
> them but internally we wrap them in an N-Ary Operator.
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org <mailto:
> gyfora@apache.org>> wrote:
> >>> Hey,
> >>>
> >>> Some initial feedback from side:
> >>>
> >>> I think this a very important problem to deal with as a lot of
> applications
> >>> depend on it. I like the proposed runtime model and that is probably
> the
> >>> good way to handle this task, it is very clean what is happening.
> >>>
> >>> My main concern is how to handle this from the API and UDFs. What you
> >>> proposed seems like a very internal thing from the API perspective and
> I
> >>> would be against exposing it in the way you wrote in your example. We
> >>> should make all effort to streamline this with the functional style
> >>> operators in some way. (so in that sense the way broadcastsets are
> handled
> >>> is pretty nice) Maybe we could extend ds.connect() to many streams
> >>>
> >>> But in any case this is awesome initiative :)
> >>>
> >>> Cheers,
> >>> Gyula
> >>>
> >>>
> >>> Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>>
> ezt írta (időpont: 2016. ápr. 21.,
> >>> Cs, 15:56):
> >>>
> >>>> Hi Team,
> >>>> I'm currently thinking about how we can bring the broadcast
> set/broadcast
> >>>> input feature form the DataSet API to the DataStream API. I think this
> >>>> would be a valuable addition since it would enable use cases that join
> >>>> streams with constant (or slowly changing) side information.
> >>>>
> >>>> For this purpose, I think that we need to change the way we handle
> stream
> >>>> operators. The new model would have one unified operator that handles
> all
> >>>> cases and allows to add inputs after the operator was constructed,
> thus
> >>>> allowing the specification of broadcast inputs.
> >>>>
> >>>> I wrote up this preliminary document detailing the reason why we need
> such
> >>>> a new operator for broadcast inputs and also what the API of such an
> >>>> operator would be. It also quickly touches on the required changes of
> >>>> existing per-operation stream operations such as StreamMap:
> >>>>
> >>>>
> >>>>
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> <
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> >
> >>>>
> >>>> Please have a look if you're interested. Feedback/insights are very
> >>>> welcome. :-)
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>
> >>
> >
> >
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi Jingsong,

Thanks for the feedback :)

Could you clarify a little bit what do you mean by your wished use cases?

> There are a large number jobs (in production environment) that their 
> TwoInputOperators that can be chained. We used to only watch the last 
> ten tasks transmit data through disk and network, which could have been
>  done in one task.
> For performance, if we can chain them, the average is 30%+, and there
>  is an order of magnitude in extreme cases.

As I mentioned at the end, I would like to avoid/post pone chaining of multiple/two input operators one after another because of the complexity of input selection. For the first version I would like to aim only to allow chaining the single input operators with something (2 or N input must be always head of the chain) . For example chains:

a) two input operator X -> one input operator Y -> one input operator Z  (ALLOWED)
b) n input operator X -> one input operator Y -> one input operator Z  (ALLOWED)
c) two input operator X -> one input operator Y -> two input operator Z  (NOT ALLOWED as a single chain)

The example above sounds to me like c)

I think as a follow up, we could allow c), by extend chaining to a simple rule: there can only be a single input selectable operator in the chain (again, it’s the chaining of multiple input selectable operators that’s causing some problems).

> The table layer has many special features. which give us the chance to optimize
>  it, but also results that it is hard to let underlying layer to provide an abstract 
> mechanism to implement it. For example:
> - HashJoin must read all the data on one side(build side) and then read the 
> other side (probe side).
> - HashJoin only emit data when read probe side.
> - SortMergeJoin read random, but if we have SortMergeJoin chain another
>  MergeJoin(Sort attribute re-use), that make things complicated.
> - HashAggregate/Sort only emit data in endInput.
> 
> Provide an N-Ary stream operator to make everything possible. The upper
>  layer can do anything. These things can be specific optimization, which is much
>  more natural than the lower layer.

Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin) could be easily handled by a single N-Ary Stream Operator, so this would be covered by steps 1. and 2. from my plan from my previous e-mail? That would be real nice (avoiding the input selection chaining).

Piotrek

> On 4 Dec 2019, at 14:29, Jingsong Li <ji...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Huge +1 for N-Ary Stream Operator.
> And I love this Golden Shovel award very much!
> 
> There are a large number jobs (in production environment) that their 
> TwoInputOperators that can be chained. We used to only watch the last 
> ten tasks transmit data through disk and network, which could have been
>  done in one task.
> For performance, if we can chain them, the average is 30%+, and there
>  is an order of magnitude in extreme cases.
> 
> The table layer has many special features. which give us the chance to optimize
>  it, but also results that it is hard to let underlying layer to provide an abstract 
> mechanism to implement it. For example:
> - HashJoin must read all the data on one side(build side) and then read the 
> other side (probe side).
> - HashJoin only emit data when read probe side.
> - SortMergeJoin read random, but if we have SortMergeJoin chain another
>  MergeJoin(Sort attribute re-use), that make things complicated.
> - HashAggregate/Sort only emit data in endInput.
> 
> Provide an N-Ary stream operator to make everything possible. The upper
>  layer can do anything. These things can be specific optimization, which is much
>  more natural than the lower layer.
> 
> In addition to the two optimizations you mentioned, it also gives more space to
>  eliminate virtual function calls:
> Because following this way, the table layer has to consider the operator chain.
> And in the future, we can optimize a whole N-Ary stream operator to a
>  JIT-friendly operator. Without virtual function calls, JIT can play its real strength.
> 
> Best,
> Jingsong Lee
> 
> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
> Hi,
> 
> First and foremost I would like to nominate myself to the Golden Shovel award for digging out this topic:
> 
> 
> 
> Secondly, I would like to discuss coming back to this particular idea of implementing N-Ary Stream Operator. This time motivation doesn’t come from the Side Inputs, but to efficiently support multi joins in SQL, without extra network exchanges. I’ve reviewed the design doc proposed by Aljoscha, I quite like it and I think we could start from that.
> 
> Specifically the end-goal is to allow for example Blink, to:
> 
> I. Implement A* multi broadcast join - to have a single operator chain, where probe table (source) is read locally (inside the task that’s is actually doing the join), then joined with multiple other broadcasted tables. 
> II. Another example might be when we have 2 or more sources, pre-partitioned on the same key. In that case we should also be able to perform all of the table reading and the join inside a single Task.
> 
> In order to achieve that, I would propose the following plan:
> 
> 1. Implement N-Ary Stream Operator as proposed in the design doc below, however with added support for the input selection [1].
>   - initially it can be just exposed via the `StreamTransformation`, without direct access from the `DataStream API`
> 
> 2. Allow it to be chained with sources (implemented using the FLIP-27 SourceReader [2])
> 
> 3. Think about whether we need to support more complex chaining. Without this point, motivating examples (I and II) could be implemented if all of the joins/filtering/mappings are compiled/composed into a single N-Ary Stream Operator (which could be chained with some other single input operators at the tail). We could also think about supporting of chaining a tree of for example TwoInputStreamOperators inside a single Task. However I’m leaving this as a follow up, since in that case, it’s not so easy to handle the `InputSelection` of multiple operators inside the tree.
> 
> Piotrek
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html>
> [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>> 
>>> Hi,
>>> yes, I see operators of this style as very much an internal thing. You are probably talking about use cases for OneInputOperator and TwoInputOperator where users have a very specific need and require access the the low-level details such as watermarks, state and timers to get stuff done. Maybe we could have a wrapper for these so that users can still use them but internally we wrap them in an N-Ary Operator.
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyfora@apache.org <ma...@apache.org>> wrote:
>>> Hey,
>>> 
>>> Some initial feedback from side:
>>> 
>>> I think this a very important problem to deal with as a lot of applications
>>> depend on it. I like the proposed runtime model and that is probably the
>>> good way to handle this task, it is very clean what is happening.
>>> 
>>> My main concern is how to handle this from the API and UDFs. What you
>>> proposed seems like a very internal thing from the API perspective and I
>>> would be against exposing it in the way you wrote in your example. We
>>> should make all effort to streamline this with the functional style
>>> operators in some way. (so in that sense the way broadcastsets are handled
>>> is pretty nice) Maybe we could extend ds.connect() to many streams
>>> 
>>> But in any case this is awesome initiative :)
>>> 
>>> Cheers,
>>> Gyula
>>> 
>>> 
>>> Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> ezt írta (időpont: 2016. ápr. 21.,
>>> Cs, 15:56):
>>> 
>>>> Hi Team,
>>>> I'm currently thinking about how we can bring the broadcast set/broadcast
>>>> input feature form the DataSet API to the DataStream API. I think this
>>>> would be a valuable addition since it would enable use cases that join
>>>> streams with constant (or slowly changing) side information.
>>>> 
>>>> For this purpose, I think that we need to change the way we handle stream
>>>> operators. The new model would have one unified operator that handles all
>>>> cases and allows to add inputs after the operator was constructed, thus
>>>> allowing the specification of broadcast inputs.
>>>> 
>>>> I wrote up this preliminary document detailing the reason why we need such
>>>> a new operator for broadcast inputs and also what the API of such an
>>>> operator would be. It also quickly touches on the required changes of
>>>> existing per-operation stream operations such as StreamMap:
>>>> 
>>>> 
>>>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing <https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing>
>>>> 
>>>> Please have a look if you're interested. Feedback/insights are very
>>>> welcome. :-)
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>> 
>> 
> 
> 
> 
> -- 
> Best, Jingsong Lee


Re: [DISCUSS] Add N-Ary Stream Operator

Posted by Jingsong Li <ji...@gmail.com>.
Hi Piotr,

Huge +1 for N-Ary Stream Operator.
And I love this Golden Shovel award very much!

There are a large number jobs (in production environment) that their
TwoInputOperators that can be chained. We used to only watch the last
ten tasks transmit data through disk and network, which could have been
 done in one task.
For performance, if we can chain them, the average is 30%+, and there
 is an order of magnitude in extreme cases.

The table layer has many special features. which give us the chance to
optimize
 it, but also results that it is hard to let underlying layer to provide an
abstract
mechanism to implement it. For example:
- HashJoin must read all the data on one side(build side) and then read the
other side (probe side).
- HashJoin only emit data when read probe side.
- SortMergeJoin read random, but if we have SortMergeJoin chain another
 MergeJoin(Sort attribute re-use), that make things complicated.
- HashAggregate/Sort only emit data in endInput.

Provide an N-Ary stream operator to make everything possible. The upper
 layer can do anything. These things can be specific optimization, which is
much
 more natural than the lower layer.

In addition to the two optimizations you mentioned, it also gives more
space to
 eliminate virtual function calls:
Because following this way, the table layer has to consider the operator
chain.
And in the future, we can optimize a whole N-Ary stream operator to a
 JIT-friendly operator. Without virtual function calls, JIT can play its
real strength.

Best,
Jingsong Lee

On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> First and foremost I would like to nominate myself to the Golden Shovel
> award for digging out this topic:
>
>
> Secondly, I would like to discuss coming back to this particular idea of
> implementing N-Ary Stream Operator. This time motivation doesn’t come from
> the Side Inputs, but to efficiently support multi joins in SQL, without
> extra network exchanges. I’ve reviewed the design doc proposed by Aljoscha,
> I quite like it and I think we could start from that.
>
> Specifically the end-goal is to allow for example Blink, to:
>
> I. Implement A* multi broadcast join - to have a single operator chain,
> where probe table (source) is read locally (inside the task that’s is
> actually doing the join), then joined with multiple other broadcasted
> tables.
> II. Another example might be when we have 2 or more sources,
> pre-partitioned on the same key. In that case we should also be able to
> perform all of the table reading and the join inside a single Task.
>
> In order to achieve that, I would propose the following plan:
>
> 1. Implement N-Ary Stream Operator as proposed in the design doc below,
> however with added support for the input selection [1].
>   - initially it can be just exposed via the `StreamTransformation`,
> without direct access from the `DataStream API`
>
> 2. Allow it to be chained with sources (implemented using the FLIP-27
> SourceReader [2])
>
> 3. Think about whether we need to support more complex chaining. Without
> this point, motivating examples (I and II) could be implemented if all of
> the joins/filtering/mappings are compiled/composed into a single N-Ary
> Stream Operator (which could be chained with some other single input
> operators at the tail). We could also think about supporting of chaining a
> tree of for example TwoInputStreamOperators inside a single Task. However
> I’m leaving this as a follow up, since in that case, it’s not so easy to
> handle the `InputSelection` of multiple operators inside the tree.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
>
> On 21. Apr 2016, at 17:09, Aljoscha Krettek <al...@apache.org> wrote:
>
> Hi,
> yes, I see operators of this style as very much an internal thing. You are
> probably talking about use cases for OneInputOperator and TwoInputOperator
> where users have a very specific need and require access the the low-level
> details such as watermarks, state and timers to get stuff done. Maybe we
> could have a wrapper for these so that users can still use them but
> internally we wrap them in an N-Ary Operator.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gy...@apache.org> wrote:
> Hey,
>
> Some initial feedback from side:
>
> I think this a very important problem to deal with as a lot of applications
> depend on it. I like the proposed runtime model and that is probably the
> good way to handle this task, it is very clean what is happening.
>
> My main concern is how to handle this from the API and UDFs. What you
> proposed seems like a very internal thing from the API perspective and I
> would be against exposing it in the way you wrote in your example. We
> should make all effort to streamline this with the functional style
> operators in some way. (so in that sense the way broadcastsets are handled
> is pretty nice) Maybe we could extend ds.connect() to many streams
>
> But in any case this is awesome initiative :)
>
> Cheers,
> Gyula
>
>
> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2016. ápr. 21.,
> Cs, 15:56):
>
> Hi Team,
> I'm currently thinking about how we can bring the broadcast set/broadcast
> input feature form the DataSet API to the DataStream API. I think this
> would be a valuable addition since it would enable use cases that join
> streams with constant (or slowly changing) side information.
>
> For this purpose, I think that we need to change the way we handle stream
> operators. The new model would have one unified operator that handles all
> cases and allows to add inputs after the operator was constructed, thus
> allowing the specification of broadcast inputs.
>
> I wrote up this preliminary document detailing the reason why we need such
> a new operator for broadcast inputs and also what the API of such an
> operator would be. It also quickly touches on the required changes of
> existing per-operation stream operations such as StreamMap:
>
>
>
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>
> Please have a look if you're interested. Feedback/insights are very
> welcome. :-)
>
> Cheers,
> Aljoscha
>
>
>
>

-- 
Best, Jingsong Lee