You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martin Eden <ma...@gmail.com> on 2017/09/01 05:17:56 UTC

Re: dynamically partitioned stream

This might be a way forward but since side inputs are not there I will try
and key the control stream by the keys in the first co flat map.

I'll see how it goes.

Thanks guys,
M

On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com> wrote:

> Hi Martin,
>
> Yes, that is exactly what I thought.
> But the first step also needs to be fulfilled  by SideInput. I'm not sure
> how to achieve this in the current release.
>
> Best,
> Tony Wei
>
> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>
>> Hi Aljoscha, Tony,
>>
>> Aljoscha:
>> Yes it's the first option you mentioned.
>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be
>> applied each time a new value for either A, B or C comes in. So we need to
>> use state to cache the latest values. So using the example data stream in
>> my first msg the emitted stream should be:
>>
>> 1. Data Stream:
>> KEY VALUE TIME
>> .
>> .
>> .
>> C      V6        6
>> B      V6        6
>> A      V5        5
>> A      V4        4
>> C      V3        3
>> A      V3        3
>> B      V3        3
>> B      V2        2
>> A      V1        1
>>
>> 2. Control Stream:
>> Lambda  ArgumentKeys TIME
>> .
>> .
>> .
>> f2            [A, C]                 4
>> f1            [A, B, C]            1
>>
>> 3. Expected emitted stream:
>> TIME    VALUE
>> .
>> .
>> .
>> 6          f1(V5, V6, V3)
>>             f1(V5, V6, V6)
>>             f2(V5, V6)
>> 5          f1(V5, V3, V3)
>>             f2(V5, V3)
>> 4          f1(V4, V3, V3)
>>             f2(V4, V3)
>> 3          f1(V3, V3, V3)
>> 2          -
>> 1          -
>>
>> So essentially as soon as the argument list fills up then we apply the
>> function/lambda at each new arriving message in the data stream for either
>> argument key.
>>
>> Tony:
>> Yes we need to group by and pass to the lambda.
>> Ok, so what you are proposing might work. So your solution assumes that
>> we have to connect with the control stream twice? Once for the tagging and
>> another time re-connect-ing the control stream with the tagged stream for
>> the actual application of the function/lambda?
>>
>> Thanks,
>> Alex
>>
>>
>>
>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi Martin,
>>>
>>> In your original example, what does this syntax mean exactly:
>>>
>>> f1            [A, B, C]            1
>>>
>>> Does it mean that f1 needs one A, one B and one C from the main stream?
>>> If yes, which ones, because there are multiple As and Bs and so on. Or does
>>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I
>>> think it's quite hard to find a partitioning such that both f1, f2, and all
>>> A, B, and C go to the same machine.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com> wrote:
>>>
>>> Hi Martin,
>>>
>>> So the problem is that you want to group those arguments in Data Stream
>>> and pass them to the lambda function from Control Stream at the same time.
>>> Am I right?
>>>
>>> If right, then you could give each lambda function an id as well. Use
>>> these ids to tag those arguments to which they belong.
>>> After that, keyBy function could be used to group those arguments
>>> belonging to the same lambda function. Joining this stream with Control
>>> Stream by function id could make arguments and function be in the same
>>> instance.
>>>
>>> What do you think? Could this solution solve your problem?
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>
>>>> Thanks for your reply Tony,
>>>>
>>>> Yes we are in the latter case, where the functions/lambdas come in the
>>>> control stream. Think of them as strings containing the logic of the
>>>> function. The values for each of the arguments to the function come from
>>>> the data stream. That is why we need to co-locate the data stream messages
>>>> for the corresponding keys with the control message that has the function
>>>> to be applied.
>>>>
>>>> We have a way of interpreting the logic described in the string and
>>>> executing it on the incoming values from the data stream. This is kicked
>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>> RichCoFlatMapFunction) but is not using Flink predefined operators or
>>>> functions.
>>>>
>>>> So yeah I see your point about mapping the arguments but the problem is
>>>> not really that, the problem is making sure that the values in the control
>>>> stream are in the same instance of the task/ keyed managed state as a the
>>>> actual control stream message. Once they are we can pass them in.
>>>>
>>>> Any other thoughts?
>>>>
>>>> M
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> About problem 2. How were those lambda functions created? Pre-defined
>>>>> functions / operators or automatically generated based on the message from
>>>>> Control Stream?
>>>>>
>>>>> For the former, you could give each function one id and user flapMap
>>>>> to duplicate data with multiple ids. Then, you could use filter function
>>>>> and send them to the corresponding operators.
>>>>>
>>>>> For the general case like the latter, because you had broadcasted the
>>>>> messages to all tasks, it could always build a mapping table from argument
>>>>> keys to lambda functions in each sub-task and use the map to process the
>>>>> data. But I was wondering if it is possible to generate a completely new
>>>>> function in the runtime.
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>
>>>>>> Thanks for your reply Tony.
>>>>>>
>>>>>> So there are actually 2 problems to solve:
>>>>>>
>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>
>>>>>> 2. The data stream messages with the same keys as those specified in
>>>>>> the control message need to go to the same task as well, so that all the
>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>
>>>>>> In my understanding side inputs (which are actually not available in
>>>>>> the current release) would address problem 1.
>>>>>>
>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I
>>>>>> get a runtime exception telling me I still need to do a keyBy before the
>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>
>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>> anyone has some suggestions.
>>>>>>
>>>>>> Thanks,
>>>>>> M
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <to...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> Let me understand your question first.
>>>>>>> You have two Stream: Data Stream and Control Stream and you want to
>>>>>>> select data in Data Stream based on the key set got from Control Stream.
>>>>>>>
>>>>>>> If I were not misunderstanding your question, I think SideInput is
>>>>>>> what you want.
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>>>>>>> 17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-
>>>>>>> StoringSide-InputData
>>>>>>> It lets you to define one stream as a SideInput and can be assigned
>>>>>>> to the other stream, then the data in SideInput stream will be broadcasted.
>>>>>>>
>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>> without SideInput.
>>>>>>>
>>>>>>> Best,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>
>>>>>>>> I have 2 input message streams:
>>>>>>>>
>>>>>>>> 1. Data Stream:
>>>>>>>> KEY VALUE TIME
>>>>>>>> .
>>>>>>>> .
>>>>>>>> .
>>>>>>>> C      V6        6
>>>>>>>> B      V6        6
>>>>>>>> A      V5        5
>>>>>>>> A      V4        4
>>>>>>>> C      V3        3
>>>>>>>> A      V3        3
>>>>>>>> B      V3        3
>>>>>>>> B      V2        2
>>>>>>>> A      V1        1
>>>>>>>>
>>>>>>>> 2. Control Stream:
>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>> .
>>>>>>>> .
>>>>>>>> .
>>>>>>>> f2            [A, C]                 4
>>>>>>>> f1            [A, B, C]            1
>>>>>>>>
>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>
>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>> using .connect. For this I need to key both of them by a certain criteria.
>>>>>>>> And here lies the problem, how can I make sure the messages with keys A,B,C
>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>>>>>>>> by to achieve this.
>>>>>>>>
>>>>>>>> I suspect a custom partitioner is required that partitions the data
>>>>>>>> stream based on the messages in the control stream? Is this even possible?
>>>>>>>>
>>>>>>>> Any suggestions welcomed!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> M
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>

Re: dynamically partitioned stream

Posted by Tony Wei <to...@gmail.com>.
Hi Martin,

For the first question, as far as I know, Flink guarantees that the order
of records from the same sub-task of consumer won't be changed.
If A, B and C came from different sub tasks, the result might be like your
concern. After all, you can't have all sub tasks process in the same speed
or synchronize them.

For the second question, I think it is hard to remedy it, because sorting
elements in a continuous stream without any constraint is impossible.
Using window operator to buffer elements for a while and sort them is
adding a constraint somehow.

These are what I know so far. Maybe there are some mistakes. I think
Aljoscha can explain more clearly than me.

Best,
Tony Wei.


2017-09-07 18:00 GMT+08:00 Martin Eden <ma...@gmail.com>:

> Hi Tony,
>
> Ah I see. Yes you are right. What I was saying in my last message is that
> I relaxed that requirement after realising that it works how you just
> described it (and Aljoscha previously) and global state is not really
> feasible/possible.
>
> Here is a re-worked example. Please let me know if it makes sense?
> Basically f2 is only emitted at time 7 when we have the first values for A
> and C emitted after time 4 when f2 itself was emitted.
>
> 1. Data Stream:
> KEY VALUE TIME
> .
> .
> .
> C      V6        7
> B      V6        7
> A      V5        6
> A      V4        5
> *C      V3        3*
> *A      V3        3*
> *B      V3        3*
> B      V2        2
> A      V1        1
>
> 2. Control Stream:
> Lambda  ArgumentKeys TIME
> .
> .
> .
> f2            [A, C]                 4
> f1            [A, B, C]            0
>
> 3. Expected emitted stream:
> TIME    VALUE
> .
> .
> .
> 7          f1(V5, V6, V3)
>             f1(V5, V6, V6)
>             *f2(V5, V6)*
> 6          f1(V5, V3, V3)
> 5          f1(V4, V3, V3)
> 4          -
> *3          f1(V3, V3, V3) - or f1(V1,V2,V3) if C,V3 arrives before A V3
> or B V3*
> 2          -
> 1          -
> 0          -
>
> I guess to wrap up this discussion there is only one more thing to clarify
> about this. There is some non-determinism as to what results are emitted. I
> have highlighted this in red above.
>
> Basically the example assumes the order of events perceived in the
> operators is as it in the incoming data stream and is maintained throughout
> the Flink dag, regardless of keyBys and flatMaps, connect etc.
>
> However if the order of the events is changed by Flink after the source
> receives and before it reaches the first flatMap1 in the first connected
> operator (i.e. the 3 events for time 3 are re-ordered) then the lambda
> emits different combinations.
>
> Questions:
>
> 1. Is what I am saying correct? Can Flink change the order of events in an
> input stream as they arrive to the input of flatMap in a connected stream?
> Maybe because of the key by?
>
> 2. Can this be remedied by using a different time dimension like event
> time or ingest time? Would that work for a simple connected streams? Does
> it need to be somehow combined with a windowed operator?
>
> To clarify, in my mind, in the example above the TIME column was really
> both event time and ingest time for the data stream and control stream (in
> the same order as written) and processing time for the emitted output
> stream.
>
> So the data stream was really this:
>
> KEY VALUE EV_TIME INGEST_TIME PROCESSING_TIME
> .
> .
> .
> C      V3        3              33                     43
> A      V3        2              32                     42
> B      V3        1              31                     41
>
> Thanks,
> M
>
>
> On Thu, Sep 7, 2017 at 9:24 AM, Tony Wei <to...@gmail.com> wrote:
>
>> Hi Martin,
>>
>> What I was talking is about how to store the arguments' state. In the
>> example you explained your use case to Aljoscha.
>>
>> 4          f1(V4, V3, V3)
>>             f2(V4, V3)
>> 3          f1(V3, V3, V3)
>> 2          -
>> 1          -
>>
>> You showed that when lambda f2 came, it would emit f2(V4, V3)
>> immediately. However, the second argument (B, V3, 3) came before f2. You
>> couldn't know how to route it at that time.
>>
>> If you didn't store this data's state, then f2(V4, V3) won't happen and
>> the problem is easily.
>> Otherwise, you had to route all data and all lambda to the same node to
>> guarantee that every lambda won't lose any  their arguments' state.
>>
>> Best,
>> Tony Wei
>>
>> 2017-09-07 14:31 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>
>>> Hi Tony,
>>>
>>> Yes exactly I am assuming the lambda emits a value only after it has
>>> been published to the control topic (t1) and at least 1 value arrives in
>>> the data topic for each of it's arguments. This will happen at a time t2 >
>>> t1. So yes, there is uncertainty with regards to when t2 will happen.
>>> Ideally t2 - t1 ~ 0 but for our use case it is fine. Is this the
>>> correctness that you are talking about? Do I have the right picture of what
>>> happens?
>>>
>>> Thanks
>>> M
>>>
>>> On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <to...@gmail.com> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> The performance is an issue, but in your case, yes, it might not be a
>>>> problem if X << N.
>>>>
>>>> However, the other problem is where data should go in the beginning if
>>>> there is no lambda been received. This problem doesn't associate with
>>>> performance, but instead with correctness. If you want to keep the value
>>>> state for the incoming lambda you should broadcast it to all nodes, because
>>>> you would never know where the next lambda that requires this data would be
>>>> routed to. Of course, you can send this data to a pre-defined node and
>>>> route the lambda to this node, but this will lead to all data in the same
>>>> node to let all lambda can get all required data. It is not a good solution
>>>> because of a lack of scalability.
>>>>
>>>> In my origin thought, it is based on only storing state of data after
>>>> you receive at least one lambda that requires it, so that data has its
>>>> destination node to go. Can this assumption be acceptable in your case?
>>>> What do you think?
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> 2017-09-06 22:41 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>
>>>>> Hi Aljoscha, Tony,
>>>>>
>>>>> We actually do not need all the keys to be on all nodes where lambdas
>>>>> are. We just need the keys that represent the data for the lambda arguments
>>>>> to be routed to the same node as the lambda, whichever one it might be.
>>>>>
>>>>> Essentially in the solution we emit the data multiple times and by
>>>>> doing that we roughly multiply the input rate by the average number of
>>>>> lambdas a key is a part of (X). In terms of memory this is O(X * N) where N
>>>>> is the number of keys int the data. N is the large bit. If X ~ N then we
>>>>> have O (N^2) complexity for the Flink state. And in that case yes I see
>>>>> your point about performance Aljoscha. But if X << N, as is our case, then
>>>>> we have O(N) which should be manageable by Flink's distributed state
>>>>> mechanism right? Do you see any gotchas in this new light? Are my
>>>>> assumptions correct?
>>>>>
>>>>> Thanks,
>>>>> M
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Martin, Aljoscha
>>>>>>
>>>>>> I think Aljoscha is right. My origin thought was to keep the state
>>>>>> only after a lambda function coming.
>>>>>>
>>>>>> Use Aljoscha's scenario as example, initially, all data will be
>>>>>> discarded because there is no any lambdas. When lambda f1 [D, E] and
>>>>>> f2 [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to
>>>>>> be routed to machine "1". Then, when we get a new lambda f3 [C, D],
>>>>>> we can duplicate C, D and route these copies to machine "2".
>>>>>>
>>>>>> However, after reading your example again, I found what you want is a
>>>>>> whole picture for all variables' state in a global view, so that no matter
>>>>>> what time a new lambda comes it can always get its variables' state
>>>>>> immediately. In that case, I have the same opinion as Aljoscha.
>>>>>>
>>>>>> Best,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> I think with those requirements this is very hard (or maybe
>>>>>>> impossible) to do efficiently in a distributed setting. It might be that
>>>>>>> I'm misunderstanding things but let's look at an example. Assume that
>>>>>>> initially, we don't have any lambdas, so data can be sent to any machine
>>>>>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>>>>>> C]. Say this gets routed to machine "0", now this means that messages with
>>>>>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>>>>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>>>>>> key D and E are also routed to machine "2".
>>>>>>>
>>>>>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>>>>>> lambdas and inputs to different machines? They all have to go to the same
>>>>>>> machine, but which one? I'm currently thinking that there would need to be
>>>>>>> some component that does the routing, but this has to be global, so it's
>>>>>>> hard to do in a distributed setting.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> This might be a way forward but since side inputs are not there I
>>>>>>> will try and key the control stream by the keys in the first co flat map.
>>>>>>>
>>>>>>> I'll see how it goes.
>>>>>>>
>>>>>>> Thanks guys,
>>>>>>> M
>>>>>>>
>>>>>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>> Yes, that is exactly what I thought.
>>>>>>>> But the first step also needs to be fulfilled  by SideInput. I'm
>>>>>>>> not sure how to achieve this in the current release.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>>>>>>
>>>>>>>>> Hi Aljoscha, Tony,
>>>>>>>>>
>>>>>>>>> Aljoscha:
>>>>>>>>> Yes it's the first option you mentioned.
>>>>>>>>> Yes, the stream has multiple values in flight for A, B, C. f1
>>>>>>>>> needs to be applied each time a new value for either A, B or C comes in. So
>>>>>>>>> we need to use state to cache the latest values. So using the example data
>>>>>>>>> stream in my first msg the emitted stream should be:
>>>>>>>>>
>>>>>>>>> 1. Data Stream:
>>>>>>>>> KEY VALUE TIME
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> C      V6        6
>>>>>>>>> B      V6        6
>>>>>>>>> A      V5        5
>>>>>>>>> A      V4        4
>>>>>>>>> C      V3        3
>>>>>>>>> A      V3        3
>>>>>>>>> B      V3        3
>>>>>>>>> B      V2        2
>>>>>>>>> A      V1        1
>>>>>>>>>
>>>>>>>>> 2. Control Stream:
>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> f2            [A, C]                 4
>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>
>>>>>>>>> 3. Expected emitted stream:
>>>>>>>>> TIME    VALUE
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> 6          f1(V5, V6, V3)
>>>>>>>>>             f1(V5, V6, V6)
>>>>>>>>>             f2(V5, V6)
>>>>>>>>> 5          f1(V5, V3, V3)
>>>>>>>>>             f2(V5, V3)
>>>>>>>>> 4          f1(V4, V3, V3)
>>>>>>>>>             f2(V4, V3)
>>>>>>>>> 3          f1(V3, V3, V3)
>>>>>>>>> 2          -
>>>>>>>>> 1          -
>>>>>>>>>
>>>>>>>>> So essentially as soon as the argument list fills up then we apply
>>>>>>>>> the function/lambda at each new arriving message in the data stream for
>>>>>>>>> either argument key.
>>>>>>>>>
>>>>>>>>> Tony:
>>>>>>>>> Yes we need to group by and pass to the lambda.
>>>>>>>>> Ok, so what you are proposing might work. So your solution assumes
>>>>>>>>> that we have to connect with the control stream twice? Once for the tagging
>>>>>>>>> and another time re-connect-ing the control stream with the tagged stream
>>>>>>>>> for the actual application of the function/lambda?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Alex
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <
>>>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Martin,
>>>>>>>>>>
>>>>>>>>>> In your original example, what does this syntax mean exactly:
>>>>>>>>>>
>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>
>>>>>>>>>> Does it mean that f1 needs one A, one B and one C from the main
>>>>>>>>>> stream? If yes, which ones, because there are multiple As and Bs and so on.
>>>>>>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the first,
>>>>>>>>>> then I think it's quite hard to find a partitioning such that both f1, f2,
>>>>>>>>>> and all A, B, and C go to the same machine.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Martin,
>>>>>>>>>>
>>>>>>>>>> So the problem is that you want to group those arguments in Data
>>>>>>>>>> Stream and pass them to the lambda function from Control Stream at the same
>>>>>>>>>> time. Am I right?
>>>>>>>>>>
>>>>>>>>>> If right, then you could give each lambda function an id as well.
>>>>>>>>>> Use these ids to tag those arguments to which they belong.
>>>>>>>>>> After that, keyBy function could be used to group those arguments
>>>>>>>>>> belonging to the same lambda function. Joining this stream with Control
>>>>>>>>>> Stream by function id could make arguments and function be in the same
>>>>>>>>>> instance.
>>>>>>>>>>
>>>>>>>>>> What do you think? Could this solution solve your problem?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Tony Wei
>>>>>>>>>>
>>>>>>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Thanks for your reply Tony,
>>>>>>>>>>>
>>>>>>>>>>> Yes we are in the latter case, where the functions/lambdas come
>>>>>>>>>>> in the control stream. Think of them as strings containing the logic of the
>>>>>>>>>>> function. The values for each of the arguments to the function come from
>>>>>>>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>>>>>>>> for the corresponding keys with the control message that has the function
>>>>>>>>>>> to be applied.
>>>>>>>>>>>
>>>>>>>>>>> We have a way of interpreting the logic described in the string
>>>>>>>>>>> and executing it on the incoming values from the data stream. This is
>>>>>>>>>>> kicked off from within the Flink runtime (synchronous to a flatMap of the
>>>>>>>>>>> RichCoFlatMapFunction) but is not using Flink predefined
>>>>>>>>>>> operators or functions.
>>>>>>>>>>>
>>>>>>>>>>> So yeah I see your point about mapping the arguments but the
>>>>>>>>>>> problem is not really that, the problem is making sure that the values in
>>>>>>>>>>> the control stream are in the same instance of the task/ keyed managed
>>>>>>>>>>> state as a the actual control stream message. Once they are we can pass
>>>>>>>>>>> them in.
>>>>>>>>>>>
>>>>>>>>>>> Any other thoughts?
>>>>>>>>>>>
>>>>>>>>>>> M
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <
>>>>>>>>>>> tony19920430@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Martin,
>>>>>>>>>>>>
>>>>>>>>>>>> About problem 2. How were those lambda functions created?
>>>>>>>>>>>> Pre-defined functions / operators or automatically generated based on the
>>>>>>>>>>>> message from Control Stream?
>>>>>>>>>>>>
>>>>>>>>>>>> For the former, you could give each function one id and user
>>>>>>>>>>>> flapMap to duplicate data with multiple ids. Then, you could use filter
>>>>>>>>>>>> function and send them to the corresponding operators.
>>>>>>>>>>>>
>>>>>>>>>>>> For the general case like the latter, because you had
>>>>>>>>>>>> broadcasted the messages to all tasks, it could always build a mapping
>>>>>>>>>>>> table from argument keys to lambda functions in each sub-task and use the
>>>>>>>>>>>> map to process the data. But I was wondering if it is possible to generate
>>>>>>>>>>>> a completely new function in the runtime.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Tony Wei
>>>>>>>>>>>>
>>>>>>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden131@gmail.com
>>>>>>>>>>>> >:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your reply Tony.
>>>>>>>>>>>>>
>>>>>>>>>>>>> So there are actually 2 problems to solve:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. The data stream messages with the same keys as those
>>>>>>>>>>>>> specified in the control message need to go to the same task as well, so
>>>>>>>>>>>>> that all the values required for the lambda (i.e. functions f1, f2 ...) are
>>>>>>>>>>>>> there.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In my understanding side inputs (which are actually not
>>>>>>>>>>>>> available in the current release) would address problem 1.
>>>>>>>>>>>>>
>>>>>>>>>>>>> To address problem 1 I also tried
>>>>>>>>>>>>> dataStream.keyBy(key).connect(controlStream.broadcast).flatM
>>>>>>>>>>>>> ap(new RichCoFlatMapFunction) but I get a runtime exception
>>>>>>>>>>>>> telling me I still need to do a keyBy before the flatMap. So are the
>>>>>>>>>>>>> upcoming side inputs the only way to broadcast a control stream to all
>>>>>>>>>>>>> tasks of a coFlatMap? Or is there another way?
>>>>>>>>>>>>>
>>>>>>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate
>>>>>>>>>>>>> if anyone has some suggestions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> M
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <
>>>>>>>>>>>>> tony19920430@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Martin,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me understand your question first.
>>>>>>>>>>>>>> You have two Stream: Data Stream and Control Stream and you
>>>>>>>>>>>>>> want to select data in Data Stream based on the key set got from Control
>>>>>>>>>>>>>> Stream.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If I were not misunderstanding your question, I think
>>>>>>>>>>>>>> SideInput is what you want.
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>>>>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA
>>>>>>>>>>>>>> PI-StoringSide-InputData
>>>>>>>>>>>>>> It lets you to define one stream as a SideInput and can be
>>>>>>>>>>>>>> assigned to the other stream, then the data in SideInput stream will be
>>>>>>>>>>>>>> broadcasted.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>>>>>>>> without SideInput.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Tony Wei
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <
>>>>>>>>>>>>>> martineden131@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have 2 input message streams:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Data Stream:
>>>>>>>>>>>>>>> KEY VALUE TIME
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>> C      V6        6
>>>>>>>>>>>>>>> B      V6        6
>>>>>>>>>>>>>>> A      V5        5
>>>>>>>>>>>>>>> A      V4        4
>>>>>>>>>>>>>>> C      V3        3
>>>>>>>>>>>>>>> A      V3        3
>>>>>>>>>>>>>>> B      V3        3
>>>>>>>>>>>>>>> B      V2        2
>>>>>>>>>>>>>>> A      V1        1
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2. Control Stream:
>>>>>>>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>> f2            [A, C]                 4
>>>>>>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I want to apply the lambdas coming in the control stream to
>>>>>>>>>>>>>>> the selection of keys that are coming in the data stream.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Since we have 2 streams I naturally thought of connecting
>>>>>>>>>>>>>>> them using .connect. For this I need to key both of them by a certain
>>>>>>>>>>>>>>> criteria. And here lies the problem, how can I make sure the messages with
>>>>>>>>>>>>>>> keys A,B,C specified in the control stream end up in the same task as well
>>>>>>>>>>>>>>> as the control message (f1, [A, B, C]) itself. Basically I don't know how
>>>>>>>>>>>>>>> to key by to achieve this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I suspect a custom partitioner is required that partitions
>>>>>>>>>>>>>>> the data stream based on the messages in the control stream? Is this even
>>>>>>>>>>>>>>> possible?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> M
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: dynamically partitioned stream

Posted by Martin Eden <ma...@gmail.com>.
Hi Tony,

Ah I see. Yes you are right. What I was saying in my last message is that I
relaxed that requirement after realising that it works how you just
described it (and Aljoscha previously) and global state is not really
feasible/possible.

Here is a re-worked example. Please let me know if it makes sense?
Basically f2 is only emitted at time 7 when we have the first values for A
and C emitted after time 4 when f2 itself was emitted.

1. Data Stream:
KEY VALUE TIME
.
.
.
C      V6        7
B      V6        7
A      V5        6
A      V4        5
*C      V3        3*
*A      V3        3*
*B      V3        3*
B      V2        2
A      V1        1

2. Control Stream:
Lambda  ArgumentKeys TIME
.
.
.
f2            [A, C]                 4
f1            [A, B, C]            0

3. Expected emitted stream:
TIME    VALUE
.
.
.
7          f1(V5, V6, V3)
            f1(V5, V6, V6)
            *f2(V5, V6)*
6          f1(V5, V3, V3)
5          f1(V4, V3, V3)
4          -
*3          f1(V3, V3, V3) - or f1(V1,V2,V3) if C,V3 arrives before A V3 or
B V3*
2          -
1          -
0          -

I guess to wrap up this discussion there is only one more thing to clarify
about this. There is some non-determinism as to what results are emitted. I
have highlighted this in red above.

Basically the example assumes the order of events perceived in the
operators is as it in the incoming data stream and is maintained throughout
the Flink dag, regardless of keyBys and flatMaps, connect etc.

However if the order of the events is changed by Flink after the source
receives and before it reaches the first flatMap1 in the first connected
operator (i.e. the 3 events for time 3 are re-ordered) then the lambda
emits different combinations.

Questions:

1. Is what I am saying correct? Can Flink change the order of events in an
input stream as they arrive to the input of flatMap in a connected stream?
Maybe because of the key by?

2. Can this be remedied by using a different time dimension like event time
or ingest time? Would that work for a simple connected streams? Does it
need to be somehow combined with a windowed operator?

To clarify, in my mind, in the example above the TIME column was really
both event time and ingest time for the data stream and control stream (in
the same order as written) and processing time for the emitted output
stream.

So the data stream was really this:

KEY VALUE EV_TIME INGEST_TIME PROCESSING_TIME
.
.
.
C      V3        3              33                     43
A      V3        2              32                     42
B      V3        1              31                     41

Thanks,
M


On Thu, Sep 7, 2017 at 9:24 AM, Tony Wei <to...@gmail.com> wrote:

> Hi Martin,
>
> What I was talking is about how to store the arguments' state. In the
> example you explained your use case to Aljoscha.
>
> 4          f1(V4, V3, V3)
>             f2(V4, V3)
> 3          f1(V3, V3, V3)
> 2          -
> 1          -
>
> You showed that when lambda f2 came, it would emit f2(V4, V3) immediately.
> However, the second argument (B, V3, 3) came before f2. You couldn't know
> how to route it at that time.
>
> If you didn't store this data's state, then f2(V4, V3) won't happen and
> the problem is easily.
> Otherwise, you had to route all data and all lambda to the same node to
> guarantee that every lambda won't lose any  their arguments' state.
>
> Best,
> Tony Wei
>
> 2017-09-07 14:31 GMT+08:00 Martin Eden <ma...@gmail.com>:
>
>> Hi Tony,
>>
>> Yes exactly I am assuming the lambda emits a value only after it has been
>> published to the control topic (t1) and at least 1 value arrives in the
>> data topic for each of it's arguments. This will happen at a time t2 > t1.
>> So yes, there is uncertainty with regards to when t2 will happen. Ideally
>> t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that
>> you are talking about? Do I have the right picture of what happens?
>>
>> Thanks
>> M
>>
>> On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <to...@gmail.com> wrote:
>>
>>> Hi Martin,
>>>
>>> The performance is an issue, but in your case, yes, it might not be a
>>> problem if X << N.
>>>
>>> However, the other problem is where data should go in the beginning if
>>> there is no lambda been received. This problem doesn't associate with
>>> performance, but instead with correctness. If you want to keep the value
>>> state for the incoming lambda you should broadcast it to all nodes, because
>>> you would never know where the next lambda that requires this data would be
>>> routed to. Of course, you can send this data to a pre-defined node and
>>> route the lambda to this node, but this will lead to all data in the same
>>> node to let all lambda can get all required data. It is not a good solution
>>> because of a lack of scalability.
>>>
>>> In my origin thought, it is based on only storing state of data after
>>> you receive at least one lambda that requires it, so that data has its
>>> destination node to go. Can this assumption be acceptable in your case?
>>> What do you think?
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-09-06 22:41 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>
>>>> Hi Aljoscha, Tony,
>>>>
>>>> We actually do not need all the keys to be on all nodes where lambdas
>>>> are. We just need the keys that represent the data for the lambda arguments
>>>> to be routed to the same node as the lambda, whichever one it might be.
>>>>
>>>> Essentially in the solution we emit the data multiple times and by
>>>> doing that we roughly multiply the input rate by the average number of
>>>> lambdas a key is a part of (X). In terms of memory this is O(X * N) where N
>>>> is the number of keys int the data. N is the large bit. If X ~ N then we
>>>> have O (N^2) complexity for the Flink state. And in that case yes I see
>>>> your point about performance Aljoscha. But if X << N, as is our case, then
>>>> we have O(N) which should be manageable by Flink's distributed state
>>>> mechanism right? Do you see any gotchas in this new light? Are my
>>>> assumptions correct?
>>>>
>>>> Thanks,
>>>> M
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Martin, Aljoscha
>>>>>
>>>>> I think Aljoscha is right. My origin thought was to keep the state
>>>>> only after a lambda function coming.
>>>>>
>>>>> Use Aljoscha's scenario as example, initially, all data will be
>>>>> discarded because there is no any lambdas. When lambda f1 [D, E] and
>>>>> f2 [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to
>>>>> be routed to machine "1". Then, when we get a new lambda f3 [C, D],
>>>>> we can duplicate C, D and route these copies to machine "2".
>>>>>
>>>>> However, after reading your example again, I found what you want is a
>>>>> whole picture for all variables' state in a global view, so that no matter
>>>>> what time a new lambda comes it can always get its variables' state
>>>>> immediately. In that case, I have the same opinion as Aljoscha.
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> I think with those requirements this is very hard (or maybe
>>>>>> impossible) to do efficiently in a distributed setting. It might be that
>>>>>> I'm misunderstanding things but let's look at an example. Assume that
>>>>>> initially, we don't have any lambdas, so data can be sent to any machine
>>>>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>>>>> C]. Say this gets routed to machine "0", now this means that messages with
>>>>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>>>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>>>>> key D and E are also routed to machine "2".
>>>>>>
>>>>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>>>>> lambdas and inputs to different machines? They all have to go to the same
>>>>>> machine, but which one? I'm currently thinking that there would need to be
>>>>>> some component that does the routing, but this has to be global, so it's
>>>>>> hard to do in a distributed setting.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> This might be a way forward but since side inputs are not there I
>>>>>> will try and key the control stream by the keys in the first co flat map.
>>>>>>
>>>>>> I'll see how it goes.
>>>>>>
>>>>>> Thanks guys,
>>>>>> M
>>>>>>
>>>>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> Yes, that is exactly what I thought.
>>>>>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>>>>>> sure how to achieve this in the current release.
>>>>>>>
>>>>>>> Best,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>>>>>
>>>>>>>> Hi Aljoscha, Tony,
>>>>>>>>
>>>>>>>> Aljoscha:
>>>>>>>> Yes it's the first option you mentioned.
>>>>>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs
>>>>>>>> to be applied each time a new value for either A, B or C comes in. So we
>>>>>>>> need to use state to cache the latest values. So using the example data
>>>>>>>> stream in my first msg the emitted stream should be:
>>>>>>>>
>>>>>>>> 1. Data Stream:
>>>>>>>> KEY VALUE TIME
>>>>>>>> .
>>>>>>>> .
>>>>>>>> .
>>>>>>>> C      V6        6
>>>>>>>> B      V6        6
>>>>>>>> A      V5        5
>>>>>>>> A      V4        4
>>>>>>>> C      V3        3
>>>>>>>> A      V3        3
>>>>>>>> B      V3        3
>>>>>>>> B      V2        2
>>>>>>>> A      V1        1
>>>>>>>>
>>>>>>>> 2. Control Stream:
>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>> .
>>>>>>>> .
>>>>>>>> .
>>>>>>>> f2            [A, C]                 4
>>>>>>>> f1            [A, B, C]            1
>>>>>>>>
>>>>>>>> 3. Expected emitted stream:
>>>>>>>> TIME    VALUE
>>>>>>>> .
>>>>>>>> .
>>>>>>>> .
>>>>>>>> 6          f1(V5, V6, V3)
>>>>>>>>             f1(V5, V6, V6)
>>>>>>>>             f2(V5, V6)
>>>>>>>> 5          f1(V5, V3, V3)
>>>>>>>>             f2(V5, V3)
>>>>>>>> 4          f1(V4, V3, V3)
>>>>>>>>             f2(V4, V3)
>>>>>>>> 3          f1(V3, V3, V3)
>>>>>>>> 2          -
>>>>>>>> 1          -
>>>>>>>>
>>>>>>>> So essentially as soon as the argument list fills up then we apply
>>>>>>>> the function/lambda at each new arriving message in the data stream for
>>>>>>>> either argument key.
>>>>>>>>
>>>>>>>> Tony:
>>>>>>>> Yes we need to group by and pass to the lambda.
>>>>>>>> Ok, so what you are proposing might work. So your solution assumes
>>>>>>>> that we have to connect with the control stream twice? Once for the tagging
>>>>>>>> and another time re-connect-ing the control stream with the tagged stream
>>>>>>>> for the actual application of the function/lambda?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Alex
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <
>>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Martin,
>>>>>>>>>
>>>>>>>>> In your original example, what does this syntax mean exactly:
>>>>>>>>>
>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>
>>>>>>>>> Does it mean that f1 needs one A, one B and one C from the main
>>>>>>>>> stream? If yes, which ones, because there are multiple As and Bs and so on.
>>>>>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the first,
>>>>>>>>> then I think it's quite hard to find a partitioning such that both f1, f2,
>>>>>>>>> and all A, B, and C go to the same machine.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Martin,
>>>>>>>>>
>>>>>>>>> So the problem is that you want to group those arguments in Data
>>>>>>>>> Stream and pass them to the lambda function from Control Stream at the same
>>>>>>>>> time. Am I right?
>>>>>>>>>
>>>>>>>>> If right, then you could give each lambda function an id as well.
>>>>>>>>> Use these ids to tag those arguments to which they belong.
>>>>>>>>> After that, keyBy function could be used to group those arguments
>>>>>>>>> belonging to the same lambda function. Joining this stream with Control
>>>>>>>>> Stream by function id could make arguments and function be in the same
>>>>>>>>> instance.
>>>>>>>>>
>>>>>>>>> What do you think? Could this solution solve your problem?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tony Wei
>>>>>>>>>
>>>>>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Thanks for your reply Tony,
>>>>>>>>>>
>>>>>>>>>> Yes we are in the latter case, where the functions/lambdas come
>>>>>>>>>> in the control stream. Think of them as strings containing the logic of the
>>>>>>>>>> function. The values for each of the arguments to the function come from
>>>>>>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>>>>>>> for the corresponding keys with the control message that has the function
>>>>>>>>>> to be applied.
>>>>>>>>>>
>>>>>>>>>> We have a way of interpreting the logic described in the string
>>>>>>>>>> and executing it on the incoming values from the data stream. This is
>>>>>>>>>> kicked off from within the Flink runtime (synchronous to a flatMap of the
>>>>>>>>>> RichCoFlatMapFunction) but is not using Flink predefined
>>>>>>>>>> operators or functions.
>>>>>>>>>>
>>>>>>>>>> So yeah I see your point about mapping the arguments but the
>>>>>>>>>> problem is not really that, the problem is making sure that the values in
>>>>>>>>>> the control stream are in the same instance of the task/ keyed managed
>>>>>>>>>> state as a the actual control stream message. Once they are we can pass
>>>>>>>>>> them in.
>>>>>>>>>>
>>>>>>>>>> Any other thoughts?
>>>>>>>>>>
>>>>>>>>>> M
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <
>>>>>>>>>> tony19920430@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Martin,
>>>>>>>>>>>
>>>>>>>>>>> About problem 2. How were those lambda functions created?
>>>>>>>>>>> Pre-defined functions / operators or automatically generated based on the
>>>>>>>>>>> message from Control Stream?
>>>>>>>>>>>
>>>>>>>>>>> For the former, you could give each function one id and user
>>>>>>>>>>> flapMap to duplicate data with multiple ids. Then, you could use filter
>>>>>>>>>>> function and send them to the corresponding operators.
>>>>>>>>>>>
>>>>>>>>>>> For the general case like the latter, because you had
>>>>>>>>>>> broadcasted the messages to all tasks, it could always build a mapping
>>>>>>>>>>> table from argument keys to lambda functions in each sub-task and use the
>>>>>>>>>>> map to process the data. But I was wondering if it is possible to generate
>>>>>>>>>>> a completely new function in the runtime.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Tony Wei
>>>>>>>>>>>
>>>>>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <ma...@gmail.com>
>>>>>>>>>>> :
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your reply Tony.
>>>>>>>>>>>>
>>>>>>>>>>>> So there are actually 2 problems to solve:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>>>>>>
>>>>>>>>>>>> 2. The data stream messages with the same keys as those
>>>>>>>>>>>> specified in the control message need to go to the same task as well, so
>>>>>>>>>>>> that all the values required for the lambda (i.e. functions f1, f2 ...) are
>>>>>>>>>>>> there.
>>>>>>>>>>>>
>>>>>>>>>>>> In my understanding side inputs (which are actually not
>>>>>>>>>>>> available in the current release) would address problem 1.
>>>>>>>>>>>>
>>>>>>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but
>>>>>>>>>>>> I get a runtime exception telling me I still need to do a keyBy before the
>>>>>>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>>>>>>
>>>>>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate
>>>>>>>>>>>> if anyone has some suggestions.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> M
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <
>>>>>>>>>>>> tony19920430@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Martin,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me understand your question first.
>>>>>>>>>>>>> You have two Stream: Data Stream and Control Stream and you
>>>>>>>>>>>>> want to select data in Data Stream based on the key set got from Control
>>>>>>>>>>>>> Stream.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I were not misunderstanding your question, I think
>>>>>>>>>>>>> SideInput is what you want.
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>>>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA
>>>>>>>>>>>>> PI-StoringSide-InputData
>>>>>>>>>>>>> It lets you to define one stream as a SideInput and can be
>>>>>>>>>>>>> assigned to the other stream, then the data in SideInput stream will be
>>>>>>>>>>>>> broadcasted.
>>>>>>>>>>>>>
>>>>>>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>>>>>>> without SideInput.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Tony Wei
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <
>>>>>>>>>>>>> martineden131@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have 2 input message streams:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. Data Stream:
>>>>>>>>>>>>>> KEY VALUE TIME
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> C      V6        6
>>>>>>>>>>>>>> B      V6        6
>>>>>>>>>>>>>> A      V5        5
>>>>>>>>>>>>>> A      V4        4
>>>>>>>>>>>>>> C      V3        3
>>>>>>>>>>>>>> A      V3        3
>>>>>>>>>>>>>> B      V3        3
>>>>>>>>>>>>>> B      V2        2
>>>>>>>>>>>>>> A      V1        1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. Control Stream:
>>>>>>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> f2            [A, C]                 4
>>>>>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I want to apply the lambdas coming in the control stream to
>>>>>>>>>>>>>> the selection of keys that are coming in the data stream.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Since we have 2 streams I naturally thought of connecting
>>>>>>>>>>>>>> them using .connect. For this I need to key both of them by a certain
>>>>>>>>>>>>>> criteria. And here lies the problem, how can I make sure the messages with
>>>>>>>>>>>>>> keys A,B,C specified in the control stream end up in the same task as well
>>>>>>>>>>>>>> as the control message (f1, [A, B, C]) itself. Basically I don't know how
>>>>>>>>>>>>>> to key by to achieve this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I suspect a custom partitioner is required that partitions
>>>>>>>>>>>>>> the data stream based on the messages in the control stream? Is this even
>>>>>>>>>>>>>> possible?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> M
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: dynamically partitioned stream

Posted by Tony Wei <to...@gmail.com>.
Hi Martin,

What I was talking is about how to store the arguments' state. In the
example you explained your use case to Aljoscha.

4          f1(V4, V3, V3)
            f2(V4, V3)
3          f1(V3, V3, V3)
2          -
1          -

You showed that when lambda f2 came, it would emit f2(V4, V3) immediately.
However, the second argument (B, V3, 3) came before f2. You couldn't know
how to route it at that time.

If you didn't store this data's state, then f2(V4, V3) won't happen and the
problem is easily.
Otherwise, you had to route all data and all lambda to the same node to
guarantee that every lambda won't lose any  their arguments' state.

Best,
Tony Wei

2017-09-07 14:31 GMT+08:00 Martin Eden <ma...@gmail.com>:

> Hi Tony,
>
> Yes exactly I am assuming the lambda emits a value only after it has been
> published to the control topic (t1) and at least 1 value arrives in the
> data topic for each of it's arguments. This will happen at a time t2 > t1.
> So yes, there is uncertainty with regards to when t2 will happen. Ideally
> t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that
> you are talking about? Do I have the right picture of what happens?
>
> Thanks
> M
>
> On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <to...@gmail.com> wrote:
>
>> Hi Martin,
>>
>> The performance is an issue, but in your case, yes, it might not be a
>> problem if X << N.
>>
>> However, the other problem is where data should go in the beginning if
>> there is no lambda been received. This problem doesn't associate with
>> performance, but instead with correctness. If you want to keep the value
>> state for the incoming lambda you should broadcast it to all nodes, because
>> you would never know where the next lambda that requires this data would be
>> routed to. Of course, you can send this data to a pre-defined node and
>> route the lambda to this node, but this will lead to all data in the same
>> node to let all lambda can get all required data. It is not a good solution
>> because of a lack of scalability.
>>
>> In my origin thought, it is based on only storing state of data after you
>> receive at least one lambda that requires it, so that data has its
>> destination node to go. Can this assumption be acceptable in your case?
>> What do you think?
>>
>> Best,
>> Tony Wei
>>
>> 2017-09-06 22:41 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>
>>> Hi Aljoscha, Tony,
>>>
>>> We actually do not need all the keys to be on all nodes where lambdas
>>> are. We just need the keys that represent the data for the lambda arguments
>>> to be routed to the same node as the lambda, whichever one it might be.
>>>
>>> Essentially in the solution we emit the data multiple times and by doing
>>> that we roughly multiply the input rate by the average number of lambdas a
>>> key is a part of (X). In terms of memory this is O(X * N) where N is the
>>> number of keys int the data. N is the large bit. If X ~ N then we have O
>>> (N^2) complexity for the Flink state. And in that case yes I see your point
>>> about performance Aljoscha. But if X << N, as is our case, then we have
>>> O(N) which should be manageable by Flink's distributed state mechanism
>>> right? Do you see any gotchas in this new light? Are my assumptions correct?
>>>
>>> Thanks,
>>> M
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <to...@gmail.com> wrote:
>>>
>>>> Hi Martin, Aljoscha
>>>>
>>>> I think Aljoscha is right. My origin thought was to keep the state
>>>> only after a lambda function coming.
>>>>
>>>> Use Aljoscha's scenario as example, initially, all data will be
>>>> discarded because there is no any lambdas. When lambda f1 [D, E] and
>>>> f2 [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to
>>>> be routed to machine "1". Then, when we get a new lambda f3 [C, D], we
>>>> can duplicate C, D and route these copies to machine "2".
>>>>
>>>> However, after reading your example again, I found what you want is a
>>>> whole picture for all variables' state in a global view, so that no matter
>>>> what time a new lambda comes it can always get its variables' state
>>>> immediately. In that case, I have the same opinion as Aljoscha.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> I think with those requirements this is very hard (or maybe
>>>>> impossible) to do efficiently in a distributed setting. It might be that
>>>>> I'm misunderstanding things but let's look at an example. Assume that
>>>>> initially, we don't have any lambdas, so data can be sent to any machine
>>>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>>>> C]. Say this gets routed to machine "0", now this means that messages with
>>>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>>>> key D and E are also routed to machine "2".
>>>>>
>>>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>>>> lambdas and inputs to different machines? They all have to go to the same
>>>>> machine, but which one? I'm currently thinking that there would need to be
>>>>> some component that does the routing, but this has to be global, so it's
>>>>> hard to do in a distributed setting.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com> wrote:
>>>>>
>>>>> This might be a way forward but since side inputs are not there I will
>>>>> try and key the control stream by the keys in the first co flat map.
>>>>>
>>>>> I'll see how it goes.
>>>>>
>>>>> Thanks guys,
>>>>> M
>>>>>
>>>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> Yes, that is exactly what I thought.
>>>>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>>>>> sure how to achieve this in the current release.
>>>>>>
>>>>>> Best,
>>>>>> Tony Wei
>>>>>>
>>>>>> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>>>>
>>>>>>> Hi Aljoscha, Tony,
>>>>>>>
>>>>>>> Aljoscha:
>>>>>>> Yes it's the first option you mentioned.
>>>>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs
>>>>>>> to be applied each time a new value for either A, B or C comes in. So we
>>>>>>> need to use state to cache the latest values. So using the example data
>>>>>>> stream in my first msg the emitted stream should be:
>>>>>>>
>>>>>>> 1. Data Stream:
>>>>>>> KEY VALUE TIME
>>>>>>> .
>>>>>>> .
>>>>>>> .
>>>>>>> C      V6        6
>>>>>>> B      V6        6
>>>>>>> A      V5        5
>>>>>>> A      V4        4
>>>>>>> C      V3        3
>>>>>>> A      V3        3
>>>>>>> B      V3        3
>>>>>>> B      V2        2
>>>>>>> A      V1        1
>>>>>>>
>>>>>>> 2. Control Stream:
>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>> .
>>>>>>> .
>>>>>>> .
>>>>>>> f2            [A, C]                 4
>>>>>>> f1            [A, B, C]            1
>>>>>>>
>>>>>>> 3. Expected emitted stream:
>>>>>>> TIME    VALUE
>>>>>>> .
>>>>>>> .
>>>>>>> .
>>>>>>> 6          f1(V5, V6, V3)
>>>>>>>             f1(V5, V6, V6)
>>>>>>>             f2(V5, V6)
>>>>>>> 5          f1(V5, V3, V3)
>>>>>>>             f2(V5, V3)
>>>>>>> 4          f1(V4, V3, V3)
>>>>>>>             f2(V4, V3)
>>>>>>> 3          f1(V3, V3, V3)
>>>>>>> 2          -
>>>>>>> 1          -
>>>>>>>
>>>>>>> So essentially as soon as the argument list fills up then we apply
>>>>>>> the function/lambda at each new arriving message in the data stream for
>>>>>>> either argument key.
>>>>>>>
>>>>>>> Tony:
>>>>>>> Yes we need to group by and pass to the lambda.
>>>>>>> Ok, so what you are proposing might work. So your solution assumes
>>>>>>> that we have to connect with the control stream twice? Once for the tagging
>>>>>>> and another time re-connect-ing the control stream with the tagged stream
>>>>>>> for the actual application of the function/lambda?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Alex
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <
>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>> In your original example, what does this syntax mean exactly:
>>>>>>>>
>>>>>>>> f1            [A, B, C]            1
>>>>>>>>
>>>>>>>> Does it mean that f1 needs one A, one B and one C from the main
>>>>>>>> stream? If yes, which ones, because there are multiple As and Bs and so on.
>>>>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the first,
>>>>>>>> then I think it's quite hard to find a partitioning such that both f1, f2,
>>>>>>>> and all A, B, and C go to the same machine.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>> So the problem is that you want to group those arguments in Data
>>>>>>>> Stream and pass them to the lambda function from Control Stream at the same
>>>>>>>> time. Am I right?
>>>>>>>>
>>>>>>>> If right, then you could give each lambda function an id as well.
>>>>>>>> Use these ids to tag those arguments to which they belong.
>>>>>>>> After that, keyBy function could be used to group those arguments
>>>>>>>> belonging to the same lambda function. Joining this stream with Control
>>>>>>>> Stream by function id could make arguments and function be in the same
>>>>>>>> instance.
>>>>>>>>
>>>>>>>> What do you think? Could this solution solve your problem?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>
>>>>>>>>> Thanks for your reply Tony,
>>>>>>>>>
>>>>>>>>> Yes we are in the latter case, where the functions/lambdas come in
>>>>>>>>> the control stream. Think of them as strings containing the logic of the
>>>>>>>>> function. The values for each of the arguments to the function come from
>>>>>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>>>>>> for the corresponding keys with the control message that has the function
>>>>>>>>> to be applied.
>>>>>>>>>
>>>>>>>>> We have a way of interpreting the logic described in the string
>>>>>>>>> and executing it on the incoming values from the data stream. This is
>>>>>>>>> kicked off from within the Flink runtime (synchronous to a flatMap of the
>>>>>>>>> RichCoFlatMapFunction) but is not using Flink predefined
>>>>>>>>> operators or functions.
>>>>>>>>>
>>>>>>>>> So yeah I see your point about mapping the arguments but the
>>>>>>>>> problem is not really that, the problem is making sure that the values in
>>>>>>>>> the control stream are in the same instance of the task/ keyed managed
>>>>>>>>> state as a the actual control stream message. Once they are we can pass
>>>>>>>>> them in.
>>>>>>>>>
>>>>>>>>> Any other thoughts?
>>>>>>>>>
>>>>>>>>> M
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920430@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Hi Martin,
>>>>>>>>>>
>>>>>>>>>> About problem 2. How were those lambda functions created?
>>>>>>>>>> Pre-defined functions / operators or automatically generated based on the
>>>>>>>>>> message from Control Stream?
>>>>>>>>>>
>>>>>>>>>> For the former, you could give each function one id and user
>>>>>>>>>> flapMap to duplicate data with multiple ids. Then, you could use filter
>>>>>>>>>> function and send them to the corresponding operators.
>>>>>>>>>>
>>>>>>>>>> For the general case like the latter, because you had broadcasted
>>>>>>>>>> the messages to all tasks, it could always build a mapping table from
>>>>>>>>>> argument keys to lambda functions in each sub-task and use the map to
>>>>>>>>>> process the data. But I was wondering if it is possible to generate a
>>>>>>>>>> completely new function in the runtime.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Tony Wei
>>>>>>>>>>
>>>>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Thanks for your reply Tony.
>>>>>>>>>>>
>>>>>>>>>>> So there are actually 2 problems to solve:
>>>>>>>>>>>
>>>>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>>>>>
>>>>>>>>>>> 2. The data stream messages with the same keys as those
>>>>>>>>>>> specified in the control message need to go to the same task as well, so
>>>>>>>>>>> that all the values required for the lambda (i.e. functions f1, f2 ...) are
>>>>>>>>>>> there.
>>>>>>>>>>>
>>>>>>>>>>> In my understanding side inputs (which are actually not
>>>>>>>>>>> available in the current release) would address problem 1.
>>>>>>>>>>>
>>>>>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but
>>>>>>>>>>> I get a runtime exception telling me I still need to do a keyBy before the
>>>>>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>>>>>
>>>>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate
>>>>>>>>>>> if anyone has some suggestions.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> M
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <
>>>>>>>>>>> tony19920430@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Martin,
>>>>>>>>>>>>
>>>>>>>>>>>> Let me understand your question first.
>>>>>>>>>>>> You have two Stream: Data Stream and Control Stream and you
>>>>>>>>>>>> want to select data in Data Stream based on the key set got from Control
>>>>>>>>>>>> Stream.
>>>>>>>>>>>>
>>>>>>>>>>>> If I were not misunderstanding your question, I think SideInput
>>>>>>>>>>>> is what you want.
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA
>>>>>>>>>>>> PI-StoringSide-InputData
>>>>>>>>>>>> It lets you to define one stream as a SideInput and can be
>>>>>>>>>>>> assigned to the other stream, then the data in SideInput stream will be
>>>>>>>>>>>> broadcasted.
>>>>>>>>>>>>
>>>>>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>>>>>> without SideInput.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Tony Wei
>>>>>>>>>>>>
>>>>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden131@gmail.com
>>>>>>>>>>>> >:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have 2 input message streams:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. Data Stream:
>>>>>>>>>>>>> KEY VALUE TIME
>>>>>>>>>>>>> .
>>>>>>>>>>>>> .
>>>>>>>>>>>>> .
>>>>>>>>>>>>> C      V6        6
>>>>>>>>>>>>> B      V6        6
>>>>>>>>>>>>> A      V5        5
>>>>>>>>>>>>> A      V4        4
>>>>>>>>>>>>> C      V3        3
>>>>>>>>>>>>> A      V3        3
>>>>>>>>>>>>> B      V3        3
>>>>>>>>>>>>> B      V2        2
>>>>>>>>>>>>> A      V1        1
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. Control Stream:
>>>>>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>>>>>> .
>>>>>>>>>>>>> .
>>>>>>>>>>>>> .
>>>>>>>>>>>>> f2            [A, C]                 4
>>>>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>>>>
>>>>>>>>>>>>> I want to apply the lambdas coming in the control stream to
>>>>>>>>>>>>> the selection of keys that are coming in the data stream.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>>>>>>> using .connect. For this I need to key both of them by a certain criteria.
>>>>>>>>>>>>> And here lies the problem, how can I make sure the messages with keys A,B,C
>>>>>>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>>>>>>>>>>>>> by to achieve this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I suspect a custom partitioner is required that partitions the
>>>>>>>>>>>>> data stream based on the messages in the control stream? Is this even
>>>>>>>>>>>>> possible?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> M
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: dynamically partitioned stream

Posted by Martin Eden <ma...@gmail.com>.
Hi Tony,

Yes exactly I am assuming the lambda emits a value only after it has been
published to the control topic (t1) and at least 1 value arrives in the
data topic for each of it's arguments. This will happen at a time t2 > t1.
So yes, there is uncertainty with regards to when t2 will happen. Ideally
t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that
you are talking about? Do I have the right picture of what happens?

Thanks
M

On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <to...@gmail.com> wrote:

> Hi Martin,
>
> The performance is an issue, but in your case, yes, it might not be a
> problem if X << N.
>
> However, the other problem is where data should go in the beginning if
> there is no lambda been received. This problem doesn't associate with
> performance, but instead with correctness. If you want to keep the value
> state for the incoming lambda you should broadcast it to all nodes, because
> you would never know where the next lambda that requires this data would be
> routed to. Of course, you can send this data to a pre-defined node and
> route the lambda to this node, but this will lead to all data in the same
> node to let all lambda can get all required data. It is not a good solution
> because of a lack of scalability.
>
> In my origin thought, it is based on only storing state of data after you
> receive at least one lambda that requires it, so that data has its
> destination node to go. Can this assumption be acceptable in your case?
> What do you think?
>
> Best,
> Tony Wei
>
> 2017-09-06 22:41 GMT+08:00 Martin Eden <ma...@gmail.com>:
>
>> Hi Aljoscha, Tony,
>>
>> We actually do not need all the keys to be on all nodes where lambdas
>> are. We just need the keys that represent the data for the lambda arguments
>> to be routed to the same node as the lambda, whichever one it might be.
>>
>> Essentially in the solution we emit the data multiple times and by doing
>> that we roughly multiply the input rate by the average number of lambdas a
>> key is a part of (X). In terms of memory this is O(X * N) where N is the
>> number of keys int the data. N is the large bit. If X ~ N then we have O
>> (N^2) complexity for the Flink state. And in that case yes I see your point
>> about performance Aljoscha. But if X << N, as is our case, then we have
>> O(N) which should be manageable by Flink's distributed state mechanism
>> right? Do you see any gotchas in this new light? Are my assumptions correct?
>>
>> Thanks,
>> M
>>
>>
>>
>>
>>
>> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <to...@gmail.com> wrote:
>>
>>> Hi Martin, Aljoscha
>>>
>>> I think Aljoscha is right. My origin thought was to keep the state only
>>> after a lambda function coming.
>>>
>>> Use Aljoscha's scenario as example, initially, all data will be
>>> discarded because there is no any lambdas. When lambda f1 [D, E] and f2
>>> [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be
>>> routed to machine "1". Then, when we get a new lambda f3 [C, D], we can
>>> duplicate C, D and route these copies to machine "2".
>>>
>>> However, after reading your example again, I found what you want is a
>>> whole picture for all variables' state in a global view, so that no matter
>>> what time a new lambda comes it can always get its variables' state
>>> immediately. In that case, I have the same opinion as Aljoscha.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> Hi Martin,
>>>>
>>>> I think with those requirements this is very hard (or maybe impossible)
>>>> to do efficiently in a distributed setting. It might be that I'm
>>>> misunderstanding things but let's look at an example. Assume that
>>>> initially, we don't have any lambdas, so data can be sent to any machine
>>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>>> C]. Say this gets routed to machine "0", now this means that messages with
>>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>>> key D and E are also routed to machine "2".
>>>>
>>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>>> lambdas and inputs to different machines? They all have to go to the same
>>>> machine, but which one? I'm currently thinking that there would need to be
>>>> some component that does the routing, but this has to be global, so it's
>>>> hard to do in a distributed setting.
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com> wrote:
>>>>
>>>> This might be a way forward but since side inputs are not there I will
>>>> try and key the control stream by the keys in the first co flat map.
>>>>
>>>> I'll see how it goes.
>>>>
>>>> Thanks guys,
>>>> M
>>>>
>>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> Yes, that is exactly what I thought.
>>>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>>>> sure how to achieve this in the current release.
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>>>
>>>>>> Hi Aljoscha, Tony,
>>>>>>
>>>>>> Aljoscha:
>>>>>> Yes it's the first option you mentioned.
>>>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs
>>>>>> to be applied each time a new value for either A, B or C comes in. So we
>>>>>> need to use state to cache the latest values. So using the example data
>>>>>> stream in my first msg the emitted stream should be:
>>>>>>
>>>>>> 1. Data Stream:
>>>>>> KEY VALUE TIME
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> C      V6        6
>>>>>> B      V6        6
>>>>>> A      V5        5
>>>>>> A      V4        4
>>>>>> C      V3        3
>>>>>> A      V3        3
>>>>>> B      V3        3
>>>>>> B      V2        2
>>>>>> A      V1        1
>>>>>>
>>>>>> 2. Control Stream:
>>>>>> Lambda  ArgumentKeys TIME
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> f2            [A, C]                 4
>>>>>> f1            [A, B, C]            1
>>>>>>
>>>>>> 3. Expected emitted stream:
>>>>>> TIME    VALUE
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> 6          f1(V5, V6, V3)
>>>>>>             f1(V5, V6, V6)
>>>>>>             f2(V5, V6)
>>>>>> 5          f1(V5, V3, V3)
>>>>>>             f2(V5, V3)
>>>>>> 4          f1(V4, V3, V3)
>>>>>>             f2(V4, V3)
>>>>>> 3          f1(V3, V3, V3)
>>>>>> 2          -
>>>>>> 1          -
>>>>>>
>>>>>> So essentially as soon as the argument list fills up then we apply
>>>>>> the function/lambda at each new arriving message in the data stream for
>>>>>> either argument key.
>>>>>>
>>>>>> Tony:
>>>>>> Yes we need to group by and pass to the lambda.
>>>>>> Ok, so what you are proposing might work. So your solution assumes
>>>>>> that we have to connect with the control stream twice? Once for the tagging
>>>>>> and another time re-connect-ing the control stream with the tagged stream
>>>>>> for the actual application of the function/lambda?
>>>>>>
>>>>>> Thanks,
>>>>>> Alex
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <
>>>>>> aljoscha@apache.org> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> In your original example, what does this syntax mean exactly:
>>>>>>>
>>>>>>> f1            [A, B, C]            1
>>>>>>>
>>>>>>> Does it mean that f1 needs one A, one B and one C from the main
>>>>>>> stream? If yes, which ones, because there are multiple As and Bs and so on.
>>>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the first,
>>>>>>> then I think it's quite hard to find a partitioning such that both f1, f2,
>>>>>>> and all A, B, and C go to the same machine.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> So the problem is that you want to group those arguments in Data
>>>>>>> Stream and pass them to the lambda function from Control Stream at the same
>>>>>>> time. Am I right?
>>>>>>>
>>>>>>> If right, then you could give each lambda function an id as well.
>>>>>>> Use these ids to tag those arguments to which they belong.
>>>>>>> After that, keyBy function could be used to group those arguments
>>>>>>> belonging to the same lambda function. Joining this stream with Control
>>>>>>> Stream by function id could make arguments and function be in the same
>>>>>>> instance.
>>>>>>>
>>>>>>> What do you think? Could this solution solve your problem?
>>>>>>>
>>>>>>> Best,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>
>>>>>>>> Thanks for your reply Tony,
>>>>>>>>
>>>>>>>> Yes we are in the latter case, where the functions/lambdas come in
>>>>>>>> the control stream. Think of them as strings containing the logic of the
>>>>>>>> function. The values for each of the arguments to the function come from
>>>>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>>>>> for the corresponding keys with the control message that has the function
>>>>>>>> to be applied.
>>>>>>>>
>>>>>>>> We have a way of interpreting the logic described in the string and
>>>>>>>> executing it on the incoming values from the data stream. This is kicked
>>>>>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>>>>>> RichCoFlatMapFunction) but is not using Flink predefined operators
>>>>>>>> or functions.
>>>>>>>>
>>>>>>>> So yeah I see your point about mapping the arguments but the
>>>>>>>> problem is not really that, the problem is making sure that the values in
>>>>>>>> the control stream are in the same instance of the task/ keyed managed
>>>>>>>> state as a the actual control stream message. Once they are we can pass
>>>>>>>> them in.
>>>>>>>>
>>>>>>>> Any other thoughts?
>>>>>>>>
>>>>>>>> M
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <to...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Martin,
>>>>>>>>>
>>>>>>>>> About problem 2. How were those lambda functions created?
>>>>>>>>> Pre-defined functions / operators or automatically generated based on the
>>>>>>>>> message from Control Stream?
>>>>>>>>>
>>>>>>>>> For the former, you could give each function one id and user
>>>>>>>>> flapMap to duplicate data with multiple ids. Then, you could use filter
>>>>>>>>> function and send them to the corresponding operators.
>>>>>>>>>
>>>>>>>>> For the general case like the latter, because you had broadcasted
>>>>>>>>> the messages to all tasks, it could always build a mapping table from
>>>>>>>>> argument keys to lambda functions in each sub-task and use the map to
>>>>>>>>> process the data. But I was wondering if it is possible to generate a
>>>>>>>>> completely new function in the runtime.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tony Wei
>>>>>>>>>
>>>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Thanks for your reply Tony.
>>>>>>>>>>
>>>>>>>>>> So there are actually 2 problems to solve:
>>>>>>>>>>
>>>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>>>>
>>>>>>>>>> 2. The data stream messages with the same keys as those specified
>>>>>>>>>> in the control message need to go to the same task as well, so that all the
>>>>>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>>>>>
>>>>>>>>>> In my understanding side inputs (which are actually not available
>>>>>>>>>> in the current release) would address problem 1.
>>>>>>>>>>
>>>>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but
>>>>>>>>>> I get a runtime exception telling me I still need to do a keyBy before the
>>>>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>>>>
>>>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>>>>>> anyone has some suggestions.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> M
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920430@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Martin,
>>>>>>>>>>>
>>>>>>>>>>> Let me understand your question first.
>>>>>>>>>>> You have two Stream: Data Stream and Control Stream and you want
>>>>>>>>>>> to select data in Data Stream based on the key set got from Control Stream.
>>>>>>>>>>>
>>>>>>>>>>> If I were not misunderstanding your question, I think SideInput
>>>>>>>>>>> is what you want.
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA
>>>>>>>>>>> PI-StoringSide-InputData
>>>>>>>>>>> It lets you to define one stream as a SideInput and can be
>>>>>>>>>>> assigned to the other stream, then the data in SideInput stream will be
>>>>>>>>>>> broadcasted.
>>>>>>>>>>>
>>>>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>>>>> without SideInput.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Tony Wei
>>>>>>>>>>>
>>>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <ma...@gmail.com>
>>>>>>>>>>> :
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>>>>
>>>>>>>>>>>> I have 2 input message streams:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Data Stream:
>>>>>>>>>>>> KEY VALUE TIME
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> C      V6        6
>>>>>>>>>>>> B      V6        6
>>>>>>>>>>>> A      V5        5
>>>>>>>>>>>> A      V4        4
>>>>>>>>>>>> C      V3        3
>>>>>>>>>>>> A      V3        3
>>>>>>>>>>>> B      V3        3
>>>>>>>>>>>> B      V2        2
>>>>>>>>>>>> A      V1        1
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Control Stream:
>>>>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> f2            [A, C]                 4
>>>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>>>
>>>>>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>>>>>
>>>>>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>>>>>> using .connect. For this I need to key both of them by a certain criteria.
>>>>>>>>>>>> And here lies the problem, how can I make sure the messages with keys A,B,C
>>>>>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>>>>>>>>>>>> by to achieve this.
>>>>>>>>>>>>
>>>>>>>>>>>> I suspect a custom partitioner is required that partitions the
>>>>>>>>>>>> data stream based on the messages in the control stream? Is this even
>>>>>>>>>>>> possible?
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> M
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
>

Re: dynamically partitioned stream

Posted by Tony Wei <to...@gmail.com>.
Hi Martin,

The performance is an issue, but in your case, yes, it might not be a
problem if X << N.

However, the other problem is where data should go in the beginning if
there is no lambda been received. This problem doesn't associate with
performance, but instead with correctness. If you want to keep the value
state for the incoming lambda you should broadcast it to all nodes, because
you would never know where the next lambda that requires this data would be
routed to. Of course, you can send this data to a pre-defined node and
route the lambda to this node, but this will lead to all data in the same
node to let all lambda can get all required data. It is not a good solution
because of a lack of scalability.

In my origin thought, it is based on only storing state of data after you
receive at least one lambda that requires it, so that data has its
destination node to go. Can this assumption be acceptable in your case?
What do you think?

Best,
Tony Wei

2017-09-06 22:41 GMT+08:00 Martin Eden <ma...@gmail.com>:

> Hi Aljoscha, Tony,
>
> We actually do not need all the keys to be on all nodes where lambdas are.
> We just need the keys that represent the data for the lambda arguments to
> be routed to the same node as the lambda, whichever one it might be.
>
> Essentially in the solution we emit the data multiple times and by doing
> that we roughly multiply the input rate by the average number of lambdas a
> key is a part of (X). In terms of memory this is O(X * N) where N is the
> number of keys int the data. N is the large bit. If X ~ N then we have O
> (N^2) complexity for the Flink state. And in that case yes I see your point
> about performance Aljoscha. But if X << N, as is our case, then we have
> O(N) which should be manageable by Flink's distributed state mechanism
> right? Do you see any gotchas in this new light? Are my assumptions correct?
>
> Thanks,
> M
>
>
>
>
>
> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <to...@gmail.com> wrote:
>
>> Hi Martin, Aljoscha
>>
>> I think Aljoscha is right. My origin thought was to keep the state only
>> after a lambda function coming.
>>
>> Use Aljoscha's scenario as example, initially, all data will be
>> discarded because there is no any lambdas. When lambda f1 [D, E] and f2
>> [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be
>> routed to machine "1". Then, when we get a new lambda f3 [C, D], we can
>> duplicate C, D and route these copies to machine "2".
>>
>> However, after reading your example again, I found what you want is a
>> whole picture for all variables' state in a global view, so that no matter
>> what time a new lambda comes it can always get its variables' state
>> immediately. In that case, I have the same opinion as Aljoscha.
>>
>> Best,
>> Tony Wei
>>
>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> Hi Martin,
>>>
>>> I think with those requirements this is very hard (or maybe impossible)
>>> to do efficiently in a distributed setting. It might be that I'm
>>> misunderstanding things but let's look at an example. Assume that
>>> initially, we don't have any lambdas, so data can be sent to any machine
>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>> C]. Say this gets routed to machine "0", now this means that messages with
>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>> key D and E are also routed to machine "2".
>>>
>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>> lambdas and inputs to different machines? They all have to go to the same
>>> machine, but which one? I'm currently thinking that there would need to be
>>> some component that does the routing, but this has to be global, so it's
>>> hard to do in a distributed setting.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com> wrote:
>>>
>>> This might be a way forward but since side inputs are not there I will
>>> try and key the control stream by the keys in the first co flat map.
>>>
>>> I'll see how it goes.
>>>
>>> Thanks guys,
>>> M
>>>
>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com>
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> Yes, that is exactly what I thought.
>>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>>> sure how to achieve this in the current release.
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>>
>>>>> Hi Aljoscha, Tony,
>>>>>
>>>>> Aljoscha:
>>>>> Yes it's the first option you mentioned.
>>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to
>>>>> be applied each time a new value for either A, B or C comes in. So we need
>>>>> to use state to cache the latest values. So using the example data stream
>>>>> in my first msg the emitted stream should be:
>>>>>
>>>>> 1. Data Stream:
>>>>> KEY VALUE TIME
>>>>> .
>>>>> .
>>>>> .
>>>>> C      V6        6
>>>>> B      V6        6
>>>>> A      V5        5
>>>>> A      V4        4
>>>>> C      V3        3
>>>>> A      V3        3
>>>>> B      V3        3
>>>>> B      V2        2
>>>>> A      V1        1
>>>>>
>>>>> 2. Control Stream:
>>>>> Lambda  ArgumentKeys TIME
>>>>> .
>>>>> .
>>>>> .
>>>>> f2            [A, C]                 4
>>>>> f1            [A, B, C]            1
>>>>>
>>>>> 3. Expected emitted stream:
>>>>> TIME    VALUE
>>>>> .
>>>>> .
>>>>> .
>>>>> 6          f1(V5, V6, V3)
>>>>>             f1(V5, V6, V6)
>>>>>             f2(V5, V6)
>>>>> 5          f1(V5, V3, V3)
>>>>>             f2(V5, V3)
>>>>> 4          f1(V4, V3, V3)
>>>>>             f2(V4, V3)
>>>>> 3          f1(V3, V3, V3)
>>>>> 2          -
>>>>> 1          -
>>>>>
>>>>> So essentially as soon as the argument list fills up then we apply the
>>>>> function/lambda at each new arriving message in the data stream for either
>>>>> argument key.
>>>>>
>>>>> Tony:
>>>>> Yes we need to group by and pass to the lambda.
>>>>> Ok, so what you are proposing might work. So your solution assumes
>>>>> that we have to connect with the control stream twice? Once for the tagging
>>>>> and another time re-connect-ing the control stream with the tagged stream
>>>>> for the actual application of the function/lambda?
>>>>>
>>>>> Thanks,
>>>>> Alex
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljoscha@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> In your original example, what does this syntax mean exactly:
>>>>>>
>>>>>> f1            [A, B, C]            1
>>>>>>
>>>>>> Does it mean that f1 needs one A, one B and one C from the main
>>>>>> stream? If yes, which ones, because there are multiple As and Bs and so on.
>>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the first,
>>>>>> then I think it's quite hard to find a partitioning such that both f1, f2,
>>>>>> and all A, B, and C go to the same machine.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> So the problem is that you want to group those arguments in Data
>>>>>> Stream and pass them to the lambda function from Control Stream at the same
>>>>>> time. Am I right?
>>>>>>
>>>>>> If right, then you could give each lambda function an id as well. Use
>>>>>> these ids to tag those arguments to which they belong.
>>>>>> After that, keyBy function could be used to group those arguments
>>>>>> belonging to the same lambda function. Joining this stream with Control
>>>>>> Stream by function id could make arguments and function be in the same
>>>>>> instance.
>>>>>>
>>>>>> What do you think? Could this solution solve your problem?
>>>>>>
>>>>>> Best,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>
>>>>>>> Thanks for your reply Tony,
>>>>>>>
>>>>>>> Yes we are in the latter case, where the functions/lambdas come in
>>>>>>> the control stream. Think of them as strings containing the logic of the
>>>>>>> function. The values for each of the arguments to the function come from
>>>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>>>> for the corresponding keys with the control message that has the function
>>>>>>> to be applied.
>>>>>>>
>>>>>>> We have a way of interpreting the logic described in the string and
>>>>>>> executing it on the incoming values from the data stream. This is kicked
>>>>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>>>>> RichCoFlatMapFunction) but is not using Flink predefined operators
>>>>>>> or functions.
>>>>>>>
>>>>>>> So yeah I see your point about mapping the arguments but the problem
>>>>>>> is not really that, the problem is making sure that the values in the
>>>>>>> control stream are in the same instance of the task/ keyed managed state as
>>>>>>> a the actual control stream message. Once they are we can pass them in.
>>>>>>>
>>>>>>> Any other thoughts?
>>>>>>>
>>>>>>> M
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>> About problem 2. How were those lambda functions created?
>>>>>>>> Pre-defined functions / operators or automatically generated based on the
>>>>>>>> message from Control Stream?
>>>>>>>>
>>>>>>>> For the former, you could give each function one id and user
>>>>>>>> flapMap to duplicate data with multiple ids. Then, you could use filter
>>>>>>>> function and send them to the corresponding operators.
>>>>>>>>
>>>>>>>> For the general case like the latter, because you had broadcasted
>>>>>>>> the messages to all tasks, it could always build a mapping table from
>>>>>>>> argument keys to lambda functions in each sub-task and use the map to
>>>>>>>> process the data. But I was wondering if it is possible to generate a
>>>>>>>> completely new function in the runtime.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>
>>>>>>>>> Thanks for your reply Tony.
>>>>>>>>>
>>>>>>>>> So there are actually 2 problems to solve:
>>>>>>>>>
>>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>>>
>>>>>>>>> 2. The data stream messages with the same keys as those specified
>>>>>>>>> in the control message need to go to the same task as well, so that all the
>>>>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>>>>
>>>>>>>>> In my understanding side inputs (which are actually not available
>>>>>>>>> in the current release) would address problem 1.
>>>>>>>>>
>>>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I
>>>>>>>>> get a runtime exception telling me I still need to do a keyBy before the
>>>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>>>
>>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>>>>> anyone has some suggestions.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> M
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <to...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Martin,
>>>>>>>>>>
>>>>>>>>>> Let me understand your question first.
>>>>>>>>>> You have two Stream: Data Stream and Control Stream and you want
>>>>>>>>>> to select data in Data Stream based on the key set got from Control Stream.
>>>>>>>>>>
>>>>>>>>>> If I were not misunderstanding your question, I think SideInput
>>>>>>>>>> is what you want.
>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA
>>>>>>>>>> PI-StoringSide-InputData
>>>>>>>>>> It lets you to define one stream as a SideInput and can be
>>>>>>>>>> assigned to the other stream, then the data in SideInput stream will be
>>>>>>>>>> broadcasted.
>>>>>>>>>>
>>>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>>>> without SideInput.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Tony Wei
>>>>>>>>>>
>>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>>>
>>>>>>>>>>> I have 2 input message streams:
>>>>>>>>>>>
>>>>>>>>>>> 1. Data Stream:
>>>>>>>>>>> KEY VALUE TIME
>>>>>>>>>>> .
>>>>>>>>>>> .
>>>>>>>>>>> .
>>>>>>>>>>> C      V6        6
>>>>>>>>>>> B      V6        6
>>>>>>>>>>> A      V5        5
>>>>>>>>>>> A      V4        4
>>>>>>>>>>> C      V3        3
>>>>>>>>>>> A      V3        3
>>>>>>>>>>> B      V3        3
>>>>>>>>>>> B      V2        2
>>>>>>>>>>> A      V1        1
>>>>>>>>>>>
>>>>>>>>>>> 2. Control Stream:
>>>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>>>> .
>>>>>>>>>>> .
>>>>>>>>>>> .
>>>>>>>>>>> f2            [A, C]                 4
>>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>>
>>>>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>>>>
>>>>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>>>>> using .connect. For this I need to key both of them by a certain criteria.
>>>>>>>>>>> And here lies the problem, how can I make sure the messages with keys A,B,C
>>>>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>>>>>>>>>>> by to achieve this.
>>>>>>>>>>>
>>>>>>>>>>> I suspect a custom partitioner is required that partitions the
>>>>>>>>>>> data stream based on the messages in the control stream? Is this even
>>>>>>>>>>> possible?
>>>>>>>>>>>
>>>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> M
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>>>
>>
>

Re: dynamically partitioned stream

Posted by Martin Eden <ma...@gmail.com>.
Hi Aljoscha, Tony,

We actually do not need all the keys to be on all nodes where lambdas are.
We just need the keys that represent the data for the lambda arguments to
be routed to the same node as the lambda, whichever one it might be.

Essentially in the solution we emit the data multiple times and by doing
that we roughly multiply the input rate by the average number of lambdas a
key is a part of (X). In terms of memory this is O(X * N) where N is the
number of keys int the data. N is the large bit. If X ~ N then we have O
(N^2) complexity for the Flink state. And in that case yes I see your point
about performance Aljoscha. But if X << N, as is our case, then we have
O(N) which should be manageable by Flink's distributed state mechanism
right? Do you see any gotchas in this new light? Are my assumptions correct?

Thanks,
M





On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <to...@gmail.com> wrote:

> Hi Martin, Aljoscha
>
> I think Aljoscha is right. My origin thought was to keep the state only
> after a lambda function coming.
>
> Use Aljoscha's scenario as example, initially, all data will be discarded
> because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C]
> comes, A, C begin to be routed to machine "0" and D, E begin to be routed
> to machine "1". Then, when we get a new lambda f3 [C, D], we can
> duplicate C, D and route these copies to machine "2".
>
> However, after reading your example again, I found what you want is a
> whole picture for all variables' state in a global view, so that no matter
> what time a new lambda comes it can always get its variables' state
> immediately. In that case, I have the same opinion as Aljoscha.
>
> Best,
> Tony Wei
>
> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <al...@apache.org>:
>
>> Hi Martin,
>>
>> I think with those requirements this is very hard (or maybe impossible)
>> to do efficiently in a distributed setting. It might be that I'm
>> misunderstanding things but let's look at an example. Assume that
>> initially, we don't have any lambdas, so data can be sent to any machine
>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>> C]. Say this gets routed to machine "0", now this means that messages with
>> key A and C also need to be router to machine "0". Now, we get a new lambda
>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>> key D and E are also routed to machine "2".
>>
>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>> lambdas and inputs to different machines? They all have to go to the same
>> machine, but which one? I'm currently thinking that there would need to be
>> some component that does the routing, but this has to be global, so it's
>> hard to do in a distributed setting.
>>
>> What do you think?
>>
>> Best,
>> Aljoscha
>>
>> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com> wrote:
>>
>> This might be a way forward but since side inputs are not there I will
>> try and key the control stream by the keys in the first co flat map.
>>
>> I'll see how it goes.
>>
>> Thanks guys,
>> M
>>
>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com> wrote:
>>
>>> Hi Martin,
>>>
>>> Yes, that is exactly what I thought.
>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>> sure how to achieve this in the current release.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>
>>>> Hi Aljoscha, Tony,
>>>>
>>>> Aljoscha:
>>>> Yes it's the first option you mentioned.
>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to
>>>> be applied each time a new value for either A, B or C comes in. So we need
>>>> to use state to cache the latest values. So using the example data stream
>>>> in my first msg the emitted stream should be:
>>>>
>>>> 1. Data Stream:
>>>> KEY VALUE TIME
>>>> .
>>>> .
>>>> .
>>>> C      V6        6
>>>> B      V6        6
>>>> A      V5        5
>>>> A      V4        4
>>>> C      V3        3
>>>> A      V3        3
>>>> B      V3        3
>>>> B      V2        2
>>>> A      V1        1
>>>>
>>>> 2. Control Stream:
>>>> Lambda  ArgumentKeys TIME
>>>> .
>>>> .
>>>> .
>>>> f2            [A, C]                 4
>>>> f1            [A, B, C]            1
>>>>
>>>> 3. Expected emitted stream:
>>>> TIME    VALUE
>>>> .
>>>> .
>>>> .
>>>> 6          f1(V5, V6, V3)
>>>>             f1(V5, V6, V6)
>>>>             f2(V5, V6)
>>>> 5          f1(V5, V3, V3)
>>>>             f2(V5, V3)
>>>> 4          f1(V4, V3, V3)
>>>>             f2(V4, V3)
>>>> 3          f1(V3, V3, V3)
>>>> 2          -
>>>> 1          -
>>>>
>>>> So essentially as soon as the argument list fills up then we apply the
>>>> function/lambda at each new arriving message in the data stream for either
>>>> argument key.
>>>>
>>>> Tony:
>>>> Yes we need to group by and pass to the lambda.
>>>> Ok, so what you are proposing might work. So your solution assumes that
>>>> we have to connect with the control stream twice? Once for the tagging and
>>>> another time re-connect-ing the control stream with the tagged stream for
>>>> the actual application of the function/lambda?
>>>>
>>>> Thanks,
>>>> Alex
>>>>
>>>>
>>>>
>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> In your original example, what does this syntax mean exactly:
>>>>>
>>>>> f1            [A, B, C]            1
>>>>>
>>>>> Does it mean that f1 needs one A, one B and one C from the main
>>>>> stream? If yes, which ones, because there are multiple As and Bs and so on.
>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the first,
>>>>> then I think it's quite hard to find a partitioning such that both f1, f2,
>>>>> and all A, B, and C go to the same machine.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com> wrote:
>>>>>
>>>>> Hi Martin,
>>>>>
>>>>> So the problem is that you want to group those arguments in Data
>>>>> Stream and pass them to the lambda function from Control Stream at the same
>>>>> time. Am I right?
>>>>>
>>>>> If right, then you could give each lambda function an id as well. Use
>>>>> these ids to tag those arguments to which they belong.
>>>>> After that, keyBy function could be used to group those arguments
>>>>> belonging to the same lambda function. Joining this stream with Control
>>>>> Stream by function id could make arguments and function be in the same
>>>>> instance.
>>>>>
>>>>> What do you think? Could this solution solve your problem?
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>
>>>>>> Thanks for your reply Tony,
>>>>>>
>>>>>> Yes we are in the latter case, where the functions/lambdas come in
>>>>>> the control stream. Think of them as strings containing the logic of the
>>>>>> function. The values for each of the arguments to the function come from
>>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>>> for the corresponding keys with the control message that has the function
>>>>>> to be applied.
>>>>>>
>>>>>> We have a way of interpreting the logic described in the string and
>>>>>> executing it on the incoming values from the data stream. This is kicked
>>>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>>>> RichCoFlatMapFunction) but is not using Flink predefined operators
>>>>>> or functions.
>>>>>>
>>>>>> So yeah I see your point about mapping the arguments but the problem
>>>>>> is not really that, the problem is making sure that the values in the
>>>>>> control stream are in the same instance of the task/ keyed managed state as
>>>>>> a the actual control stream message. Once they are we can pass them in.
>>>>>>
>>>>>> Any other thoughts?
>>>>>>
>>>>>> M
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <to...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> About problem 2. How were those lambda functions created?
>>>>>>> Pre-defined functions / operators or automatically generated based on the
>>>>>>> message from Control Stream?
>>>>>>>
>>>>>>> For the former, you could give each function one id and user flapMap
>>>>>>> to duplicate data with multiple ids. Then, you could use filter function
>>>>>>> and send them to the corresponding operators.
>>>>>>>
>>>>>>> For the general case like the latter, because you had broadcasted
>>>>>>> the messages to all tasks, it could always build a mapping table from
>>>>>>> argument keys to lambda functions in each sub-task and use the map to
>>>>>>> process the data. But I was wondering if it is possible to generate a
>>>>>>> completely new function in the runtime.
>>>>>>>
>>>>>>> Best,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>
>>>>>>>> Thanks for your reply Tony.
>>>>>>>>
>>>>>>>> So there are actually 2 problems to solve:
>>>>>>>>
>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>>
>>>>>>>> 2. The data stream messages with the same keys as those specified
>>>>>>>> in the control message need to go to the same task as well, so that all the
>>>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>>>
>>>>>>>> In my understanding side inputs (which are actually not available
>>>>>>>> in the current release) would address problem 1.
>>>>>>>>
>>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I
>>>>>>>> get a runtime exception telling me I still need to do a keyBy before the
>>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>>
>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>>>> anyone has some suggestions.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> M
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <to...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Martin,
>>>>>>>>>
>>>>>>>>> Let me understand your question first.
>>>>>>>>> You have two Stream: Data Stream and Control Stream and you want
>>>>>>>>> to select data in Data Stream based on the key set got from Control Stream.
>>>>>>>>>
>>>>>>>>> If I were not misunderstanding your question, I think SideInput is
>>>>>>>>> what you want.
>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStream
>>>>>>>>> API-StoringSide-InputData
>>>>>>>>> It lets you to define one stream as a SideInput and can be
>>>>>>>>> assigned to the other stream, then the data in SideInput stream will be
>>>>>>>>> broadcasted.
>>>>>>>>>
>>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>>> without SideInput.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tony Wei
>>>>>>>>>
>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>>
>>>>>>>>>> I have 2 input message streams:
>>>>>>>>>>
>>>>>>>>>> 1. Data Stream:
>>>>>>>>>> KEY VALUE TIME
>>>>>>>>>> .
>>>>>>>>>> .
>>>>>>>>>> .
>>>>>>>>>> C      V6        6
>>>>>>>>>> B      V6        6
>>>>>>>>>> A      V5        5
>>>>>>>>>> A      V4        4
>>>>>>>>>> C      V3        3
>>>>>>>>>> A      V3        3
>>>>>>>>>> B      V3        3
>>>>>>>>>> B      V2        2
>>>>>>>>>> A      V1        1
>>>>>>>>>>
>>>>>>>>>> 2. Control Stream:
>>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>>> .
>>>>>>>>>> .
>>>>>>>>>> .
>>>>>>>>>> f2            [A, C]                 4
>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>
>>>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>>>
>>>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>>>> using .connect. For this I need to key both of them by a certain criteria.
>>>>>>>>>> And here lies the problem, how can I make sure the messages with keys A,B,C
>>>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>>>>>>>>>> by to achieve this.
>>>>>>>>>>
>>>>>>>>>> I suspect a custom partitioner is required that partitions the
>>>>>>>>>> data stream based on the messages in the control stream? Is this even
>>>>>>>>>> possible?
>>>>>>>>>>
>>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> M
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>
>>
>

Re: dynamically partitioned stream

Posted by Tony Wei <to...@gmail.com>.
Hi Martin, Aljoscha

I think Aljoscha is right. My origin thought was to keep the state only
after a lambda function coming.

Use Aljoscha's scenario as example, initially, all data will be discarded
because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C] comes,
A, C begin to be routed to machine "0" and D, E begin to be routed to
machine "1". Then, when we get a new lambda f3 [C, D], we can duplicate C,
D and route these copies to machine "2".

However, after reading your example again, I found what you want is a whole
picture for all variables' state in a global view, so that no matter what
time a new lambda comes it can always get its variables' state immediately. In
that case, I have the same opinion as Aljoscha.

Best,
Tony Wei

2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <al...@apache.org>:

> Hi Martin,
>
> I think with those requirements this is very hard (or maybe impossible) to
> do efficiently in a distributed setting. It might be that I'm
> misunderstanding things but let's look at an example. Assume that
> initially, we don't have any lambdas, so data can be sent to any machine
> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
> C]. Say this gets routed to machine "0", now this means that messages with
> key A and C also need to be router to machine "0". Now, we get a new lambda
> f1 [D, E], say this gets routed to machine "2", meaning that messages with
> key D and E are also routed to machine "2".
>
> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
> lambdas and inputs to different machines? They all have to go to the same
> machine, but which one? I'm currently thinking that there would need to be
> some component that does the routing, but this has to be global, so it's
> hard to do in a distributed setting.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com> wrote:
>
> This might be a way forward but since side inputs are not there I will try
> and key the control stream by the keys in the first co flat map.
>
> I'll see how it goes.
>
> Thanks guys,
> M
>
> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <to...@gmail.com> wrote:
>
>> Hi Martin,
>>
>> Yes, that is exactly what I thought.
>> But the first step also needs to be fulfilled  by SideInput. I'm not sure
>> how to achieve this in the current release.
>>
>> Best,
>> Tony Wei
>>
>> Martin Eden <ma...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>
>>> Hi Aljoscha, Tony,
>>>
>>> Aljoscha:
>>> Yes it's the first option you mentioned.
>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to
>>> be applied each time a new value for either A, B or C comes in. So we need
>>> to use state to cache the latest values. So using the example data stream
>>> in my first msg the emitted stream should be:
>>>
>>> 1. Data Stream:
>>> KEY VALUE TIME
>>> .
>>> .
>>> .
>>> C      V6        6
>>> B      V6        6
>>> A      V5        5
>>> A      V4        4
>>> C      V3        3
>>> A      V3        3
>>> B      V3        3
>>> B      V2        2
>>> A      V1        1
>>>
>>> 2. Control Stream:
>>> Lambda  ArgumentKeys TIME
>>> .
>>> .
>>> .
>>> f2            [A, C]                 4
>>> f1            [A, B, C]            1
>>>
>>> 3. Expected emitted stream:
>>> TIME    VALUE
>>> .
>>> .
>>> .
>>> 6          f1(V5, V6, V3)
>>>             f1(V5, V6, V6)
>>>             f2(V5, V6)
>>> 5          f1(V5, V3, V3)
>>>             f2(V5, V3)
>>> 4          f1(V4, V3, V3)
>>>             f2(V4, V3)
>>> 3          f1(V3, V3, V3)
>>> 2          -
>>> 1          -
>>>
>>> So essentially as soon as the argument list fills up then we apply the
>>> function/lambda at each new arriving message in the data stream for either
>>> argument key.
>>>
>>> Tony:
>>> Yes we need to group by and pass to the lambda.
>>> Ok, so what you are proposing might work. So your solution assumes that
>>> we have to connect with the control stream twice? Once for the tagging and
>>> another time re-connect-ing the control stream with the tagged stream for
>>> the actual application of the function/lambda?
>>>
>>> Thanks,
>>> Alex
>>>
>>>
>>>
>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> In your original example, what does this syntax mean exactly:
>>>>
>>>> f1            [A, B, C]            1
>>>>
>>>> Does it mean that f1 needs one A, one B and one C from the main stream?
>>>> If yes, which ones, because there are multiple As and Bs and so on. Or does
>>>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I
>>>> think it's quite hard to find a partitioning such that both f1, f2, and all
>>>> A, B, and C go to the same machine.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 31. Aug 2017, at 15:53, Tony Wei <to...@gmail.com> wrote:
>>>>
>>>> Hi Martin,
>>>>
>>>> So the problem is that you want to group those arguments in Data Stream
>>>> and pass them to the lambda function from Control Stream at the same time.
>>>> Am I right?
>>>>
>>>> If right, then you could give each lambda function an id as well. Use
>>>> these ids to tag those arguments to which they belong.
>>>> After that, keyBy function could be used to group those arguments
>>>> belonging to the same lambda function. Joining this stream with Control
>>>> Stream by function id could make arguments and function be in the same
>>>> instance.
>>>>
>>>> What do you think? Could this solution solve your problem?
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>
>>>>> Thanks for your reply Tony,
>>>>>
>>>>> Yes we are in the latter case, where the functions/lambdas come in the
>>>>> control stream. Think of them as strings containing the logic of the
>>>>> function. The values for each of the arguments to the function come from
>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>> for the corresponding keys with the control message that has the function
>>>>> to be applied.
>>>>>
>>>>> We have a way of interpreting the logic described in the string and
>>>>> executing it on the incoming values from the data stream. This is kicked
>>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>>> RichCoFlatMapFunction) but is not using Flink predefined operators or
>>>>> functions.
>>>>>
>>>>> So yeah I see your point about mapping the arguments but the problem
>>>>> is not really that, the problem is making sure that the values in the
>>>>> control stream are in the same instance of the task/ keyed managed state as
>>>>> a the actual control stream message. Once they are we can pass them in.
>>>>>
>>>>> Any other thoughts?
>>>>>
>>>>> M
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> About problem 2. How were those lambda functions created? Pre-defined
>>>>>> functions / operators or automatically generated based on the message from
>>>>>> Control Stream?
>>>>>>
>>>>>> For the former, you could give each function one id and user flapMap
>>>>>> to duplicate data with multiple ids. Then, you could use filter function
>>>>>> and send them to the corresponding operators.
>>>>>>
>>>>>> For the general case like the latter, because you had broadcasted the
>>>>>> messages to all tasks, it could always build a mapping table from argument
>>>>>> keys to lambda functions in each sub-task and use the map to process the
>>>>>> data. But I was wondering if it is possible to generate a completely new
>>>>>> function in the runtime.
>>>>>>
>>>>>> Best,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>
>>>>>>> Thanks for your reply Tony.
>>>>>>>
>>>>>>> So there are actually 2 problems to solve:
>>>>>>>
>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>
>>>>>>> 2. The data stream messages with the same keys as those specified in
>>>>>>> the control message need to go to the same task as well, so that all the
>>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>>
>>>>>>> In my understanding side inputs (which are actually not available in
>>>>>>> the current release) would address problem 1.
>>>>>>>
>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I
>>>>>>> get a runtime exception telling me I still need to do a keyBy before the
>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>
>>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>>> anyone has some suggestions.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> M
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>> Let me understand your question first.
>>>>>>>> You have two Stream: Data Stream and Control Stream and you want to
>>>>>>>> select data in Data Stream based on the key set got from Control Stream.
>>>>>>>>
>>>>>>>> If I were not misunderstanding your question, I think SideInput is
>>>>>>>> what you want.
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+
>>>>>>>> Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStrea
>>>>>>>> mAPI-StoringSide-InputData
>>>>>>>> It lets you to define one stream as a SideInput and can be assigned
>>>>>>>> to the other stream, then the data in SideInput stream will be broadcasted.
>>>>>>>>
>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>> without SideInput.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <ma...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>
>>>>>>>>> I have 2 input message streams:
>>>>>>>>>
>>>>>>>>> 1. Data Stream:
>>>>>>>>> KEY VALUE TIME
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> C      V6        6
>>>>>>>>> B      V6        6
>>>>>>>>> A      V5        5
>>>>>>>>> A      V4        4
>>>>>>>>> C      V3        3
>>>>>>>>> A      V3        3
>>>>>>>>> B      V3        3
>>>>>>>>> B      V2        2
>>>>>>>>> A      V1        1
>>>>>>>>>
>>>>>>>>> 2. Control Stream:
>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> f2            [A, C]                 4
>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>
>>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>>
>>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>>> using .connect. For this I need to key both of them by a certain criteria.
>>>>>>>>> And here lies the problem, how can I make sure the messages with keys A,B,C
>>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>>>>>>>>> by to achieve this.
>>>>>>>>>
>>>>>>>>> I suspect a custom partitioner is required that partitions the
>>>>>>>>> data stream based on the messages in the control stream? Is this even
>>>>>>>>> possible?
>>>>>>>>>
>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> M
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>
>

Re: dynamically partitioned stream

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Martin,

I think with those requirements this is very hard (or maybe impossible) to do efficiently in a distributed setting. It might be that I'm misunderstanding things but let's look at an example. Assume that initially, we don't have any lambdas, so data can be sent to any machine because it doesn't matter where they go. Now, we get a new lambda f2 [A, C]. Say this gets routed to machine "0", now this means that messages with key A and C also need to be router to machine "0". Now, we get a new lambda f1 [D, E], say this gets routed to machine "2", meaning that messages with key D and E are also routed to machine "2".

Then, we get a new lambda f3 [C, D]. Do we now re-route all previous lambdas and inputs to different machines? They all have to go to the same machine, but which one? I'm currently thinking that there would need to be some component that does the routing, but this has to be global, so it's hard to do in a distributed setting.

What do you think?

Best,
Aljoscha

> On 1. Sep 2017, at 07:17, Martin Eden <ma...@gmail.com> wrote:
> 
> This might be a way forward but since side inputs are not there I will try and key the control stream by the keys in the first co flat map.
> 
> I'll see how it goes.
> 
> Thanks guys,
> M
> 
> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920430@gmail.com <ma...@gmail.com>> wrote:
> Hi Martin,
> 
> Yes, that is exactly what I thought. 
> But the first step also needs to be fulfilled  by SideInput. I'm not sure how to achieve this in the current release. 
> 
> Best,
> Tony Wei
> 
> Martin Eden <martineden131@gmail.com <ma...@gmail.com>>於 2017年8月31日 週四,下午11:32寫道:
> Hi Aljoscha, Tony,
> 
> Aljoscha: 
> Yes it's the first option you mentioned.
> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be applied each time a new value for either A, B or C comes in. So we need to use state to cache the latest values. So using the example data stream in my first msg the emitted stream should be:
> 
> 1. Data Stream:
> KEY VALUE TIME
> .
> .
> .
> C      V6        6
> B      V6        6
> A      V5        5
> A      V4        4
> C      V3        3
> A      V3        3
> B      V3        3
> B      V2        2
> A      V1        1
> 
> 2. Control Stream:
> Lambda  ArgumentKeys TIME
> .
> .
> .
> f2            [A, C]                 4
> f1            [A, B, C]            1
> 
> 3. Expected emitted stream:
> TIME    VALUE 
> .
> .
> .
> 6          f1(V5, V6, V3)
>             f1(V5, V6, V6)
>             f2(V5, V6)
> 5          f1(V5, V3, V3)
>             f2(V5, V3)
> 4          f1(V4, V3, V3)
>             f2(V4, V3)
> 3          f1(V3, V3, V3)
> 2          -
> 1          -
> 
> So essentially as soon as the argument list fills up then we apply the function/lambda at each new arriving message in the data stream for either argument key.
> 
> Tony:
> Yes we need to group by and pass to the lambda.
> Ok, so what you are proposing might work. So your solution assumes that we have to connect with the control stream twice? Once for the tagging and another time re-connect-ing the control stream with the tagged stream for the actual application of the function/lambda?
> 
> Thanks,
> Alex
> 
> 
> 
> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi Martin,
> 
> In your original example, what does this syntax mean exactly:
> 
> f1            [A, B, C]            1
> 
> Does it mean that f1 needs one A, one B and one C from the main stream? If yes, which ones, because there are multiple As and Bs and so on. Or does it mean that f1 can apply to an A or a B or a C? If it's the first, then I think it's quite hard to find a partitioning such that both f1, f2, and all A, B, and C go to the same machine.
> 
> Best,
> Aljoscha
> 
>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920430@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Martin,
>> 
>> So the problem is that you want to group those arguments in Data Stream and pass them to the lambda function from Control Stream at the same time. Am I right?
>> 
>> If right, then you could give each lambda function an id as well. Use these ids to tag those arguments to which they belong.
>> After that, keyBy function could be used to group those arguments belonging to the same lambda function. Joining this stream with Control Stream by function id could make arguments and function be in the same instance.
>> 
>> What do you think? Could this solution solve your problem?
>> 
>> Best,
>> Tony Wei
>> 
>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden131@gmail.com <ma...@gmail.com>>:
>> Thanks for your reply Tony,
>> 
>> Yes we are in the latter case, where the functions/lambdas come in the control stream. Think of them as strings containing the logic of the function. The values for each of the arguments to the function come from the data stream. That is why we need to co-locate the data stream messages for the corresponding keys with the control message that has the function to be applied.
>> 
>> We have a way of interpreting the logic described in the string and executing it on the incoming values from the data stream. This is kicked off from within the Flink runtime (synchronous to a flatMap of the RichCoFlatMapFunction) but is not using Flink predefined operators or functions.
>> 
>> So yeah I see your point about mapping the arguments but the problem is not really that, the problem is making sure that the values in the control stream are in the same instance of the task/ keyed managed state as a the actual control stream message. Once they are we can pass them in.
>> 
>> Any other thoughts?
>> 
>> M
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920430@gmail.com <ma...@gmail.com>> wrote:
>> Hi Martin,
>> 
>> About problem 2. How were those lambda functions created? Pre-defined functions / operators or automatically generated based on the message from Control Stream?
>> 
>> For the former, you could give each function one id and user flapMap to duplicate data with multiple ids. Then, you could use filter function and send them to the corresponding operators.
>> 
>> For the general case like the latter, because you had broadcasted the messages to all tasks, it could always build a mapping table from argument keys to lambda functions in each sub-task and use the map to process the data. But I was wondering if it is possible to generate a completely new function in the runtime.
>> 
>> Best,
>> Tony Wei
>> 
>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden131@gmail.com <ma...@gmail.com>>:
>> Thanks for your reply Tony.
>> 
>> So there are actually 2 problems to solve:
>> 
>> 1. All control stream msgs need to be broadcasted to all tasks.
>> 
>> 2. The data stream messages with the same keys as those specified in the control message need to go to the same task as well, so that all the values required for the lambda (i.e. functions f1, f2 ...) are there.
>> 
>> In my understanding side inputs (which are actually not available in the current release) would address problem 1.
>> 
>> To address problem 1 I also tried dataStream.keyBy(key).connect(controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I get a runtime exception telling me I still need to do a keyBy before the flatMap. So are the upcoming side inputs the only way to broadcast a control stream to all tasks of a coFlatMap? Or is there another way?
>> 
>> As for problem 2, I am still pending a reply. Would appreciate if anyone has some suggestions.
>> 
>> Thanks,
>> M
>> 
>> 
>> 
>> 
>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920430@gmail.com <ma...@gmail.com>> wrote:
>> Hi Martin,
>> 
>> Let me understand your question first.
>> You have two Stream: Data Stream and Control Stream and you want to select data in Data Stream based on the key set got from Control Stream.
>> 
>> If I were not misunderstanding your question, I think SideInput is what you want.
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData>
>> It lets you to define one stream as a SideInput and can be assigned to the other stream, then the data in SideInput stream will be broadcasted.
>> 
>> So far, I have no idea if there is any solution to solve this without SideInput.
>> 
>> Best,
>> Tony Wei
>> 
>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden131@gmail.com <ma...@gmail.com>>:
>> Hi all,
>> 
>> I am trying to implement the following using Flink:
>> 
>> I have 2 input message streams:
>> 
>> 1. Data Stream:
>> KEY VALUE TIME
>> .
>> .
>> .
>> C      V6        6
>> B      V6        6
>> A      V5        5
>> A      V4        4
>> C      V3        3
>> A      V3        3
>> B      V3        3
>> B      V2        2
>> A      V1        1
>> 
>> 2. Control Stream:
>> Lambda  ArgumentKeys TIME
>> .
>> .
>> .
>> f2            [A, C]                 4
>> f1            [A, B, C]            1
>> 
>> I want to apply the lambdas coming in the control stream to the selection of keys that are coming in the data stream.
>> 
>> Since we have 2 streams I naturally thought of connecting them using .connect. For this I need to key both of them by a certain criteria. And here lies the problem, how can I make sure the messages with keys A,B,C specified in the control stream end up in the same task as well as the control message (f1, [A, B, C]) itself. Basically I don't know how to key by to achieve this.
>> 
>> I suspect a custom partitioner is required that partitions the data stream based on the messages in the control stream? Is this even possible?
>> 
>> Any suggestions welcomed!
>> 
>> Thanks,
>> M
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 
>