You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by TechnoMage <ml...@technomage.com> on 2018/04/05 02:50:55 UTC

KeyedSream question

I am new to Flink and trying to understand the keyBy and KeyedStream.  From the short doc description I expected it to partition the data such that the following flatMap would only see elements with the same key.  That events with different keys would be presented to different instances of FlatMapFunction.  But, I am seeing it present all events in the stream to the same FlatMapFunction.

Michael

Re: KeyedSream question

Posted by Shailesh Jain <sh...@stellapps.com>.
I have a question related to KeyedStream, asking it here instead of
starting a new thread.

If I assign timestamps on a keyed stream, the resulting stream is not
keyed. So essentially I would need to apply the key by operator again after
the assign timestamps operator.
Why should assigning timestamps to events change the stream from Keyed to
Non-Keyed?

Thanks,
Shailesh

On Fri, Apr 6, 2018 at 4:31 PM, Michael Latta <ml...@technomage.com> wrote:

> Yes. It took a bit of digging in the website to find RichFlatMapFunction
> to get managed state.
>
> Michael
>
> Sent from my iPad
>
> On Apr 6, 2018, at 3:29 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi,
>
> I think Flink is exactly doing what you are looking for.
> If you use keyed state [1], Flink will put the state always in the context
> of the key of the currently processed record.
> So if you have a MapFunction with keyed state, and the map() method is
> called with a record that has a key A, the state will be the state for key
> A. If the next record has a key B, the state will be for key B.
>
> Best,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/state/state.html#keyed-state
>
> 2018-04-05 14:08 GMT+02:00 Michael Latta <ml...@technomage.com>:
>
>> Thanks for the clarification. I was just trying to understand the
>> intended behavior. It would have been nice if Flink tracked state for
>> downstream operators by key, but I can do that with a map in the downstream
>> functions.
>>
>> Michael
>>
>> Sent from my iPad
>>
>> On Apr 5, 2018, at 2:30 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Amit is correct. keyBy() ensures that all records with the same key are
>> processed by the same paralllel instance of a function.
>> This is different from "a parallel instance only sees records of one key".
>>
>> I had a look at the docs [1].
>> I agree that "Logically partitions a stream into disjoint partitions,
>> each partition containing elements of the same key." can be easily
>> interpreted as you did.
>> I've pushed a commit to clarify the description. The docs should be
>> updated soon.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/stream/operators/#datastream-transformations
>>
>> 2018-04-05 6:21 GMT+02:00 Amit Jain <aj...@gmail.com>:
>>
>>> Hi,
>>>
>>> KeyBy operation partition the data on given key and make sure same slot
>>> will
>>> get all future data belonging to same key. In default implementation, it
>>> can
>>> also map subset of keys in your DataStream to same slot.
>>>
>>> Assuming you have number of keys equal to number running slot then you
>>> may
>>> specify your custom keyBy operation to the achieve the same.
>>>
>>>
>>> Could you specify your case.
>>>
>>> --
>>> Thanks
>>> Amit
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>

Re: KeyedSream question

Posted by Michael Latta <ml...@technomage.com>.
Yes. It took a bit of digging in the website to find RichFlatMapFunction to get managed state. 

Michael

Sent from my iPad

> On Apr 6, 2018, at 3:29 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi, 
> 
> I think Flink is exactly doing what you are looking for.
> If you use keyed state [1], Flink will put the state always in the context of the key of the currently processed record.
> So if you have a MapFunction with keyed state, and the map() method is called with a record that has a key A, the state will be the state for key A. If the next record has a key B, the state will be for key B.
> 
> Best,
> Fabian
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#keyed-state
> 
> 2018-04-05 14:08 GMT+02:00 Michael Latta <ml...@technomage.com>:
>> Thanks for the clarification. I was just trying to understand the intended behavior. It would have been nice if Flink tracked state for downstream operators by key, but I can do that with a map in the downstream functions. 
>> 
>> Michael
>> 
>> Sent from my iPad
>> 
>>> On Apr 5, 2018, at 2:30 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>> 
>>> Amit is correct. keyBy() ensures that all records with the same key are processed by the same paralllel instance of a function.
>>> This is different from "a parallel instance only sees records of one key".
>>> 
>>> I had a look at the docs [1]. 
>>> I agree that "Logically partitions a stream into disjoint partitions, each partition containing elements of the same key." can be easily interpreted as you did.
>>> I've pushed a commit to clarify the description. The docs should be updated soon.
>>> 
>>> Best, Fabian 
>>> 
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#datastream-transformations
>>> 
>>> 2018-04-05 6:21 GMT+02:00 Amit Jain <aj...@gmail.com>:
>>>> Hi,
>>>> 
>>>> KeyBy operation partition the data on given key and make sure same slot will
>>>> get all future data belonging to same key. In default implementation, it can
>>>> also map subset of keys in your DataStream to same slot.
>>>> 
>>>> Assuming you have number of keys equal to number running slot then you may
>>>> specify your custom keyBy operation to the achieve the same.
>>>> 
>>>> 
>>>> Could you specify your case.
>>>> 
>>>> --
>>>> Thanks
>>>> Amit
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>> 
> 

Re: KeyedSream question

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I think Flink is exactly doing what you are looking for.
If you use keyed state [1], Flink will put the state always in the context
of the key of the currently processed record.
So if you have a MapFunction with keyed state, and the map() method is
called with a record that has a key A, the state will be the state for key
A. If the next record has a key B, the state will be for key B.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#keyed-state

2018-04-05 14:08 GMT+02:00 Michael Latta <ml...@technomage.com>:

> Thanks for the clarification. I was just trying to understand the intended
> behavior. It would have been nice if Flink tracked state for downstream
> operators by key, but I can do that with a map in the downstream functions.
>
> Michael
>
> Sent from my iPad
>
> On Apr 5, 2018, at 2:30 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Amit is correct. keyBy() ensures that all records with the same key are
> processed by the same paralllel instance of a function.
> This is different from "a parallel instance only sees records of one key".
>
> I had a look at the docs [1].
> I agree that "Logically partitions a stream into disjoint partitions, each
> partition containing elements of the same key." can be easily interpreted
> as you did.
> I've pushed a commit to clarify the description. The docs should be
> updated soon.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/stream/operators/#datastream-transformations
>
> 2018-04-05 6:21 GMT+02:00 Amit Jain <aj...@gmail.com>:
>
>> Hi,
>>
>> KeyBy operation partition the data on given key and make sure same slot
>> will
>> get all future data belonging to same key. In default implementation, it
>> can
>> also map subset of keys in your DataStream to same slot.
>>
>> Assuming you have number of keys equal to number running slot then you may
>> specify your custom keyBy operation to the achieve the same.
>>
>>
>> Could you specify your case.
>>
>> --
>> Thanks
>> Amit
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Re: KeyedSream question

Posted by Michael Latta <ml...@technomage.com>.
Thanks for the clarification. I was just trying to understand the intended behavior. It would have been nice if Flink tracked state for downstream operators by key, but I can do that with a map in the downstream functions. 

Michael

Sent from my iPad

> On Apr 5, 2018, at 2:30 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Amit is correct. keyBy() ensures that all records with the same key are processed by the same paralllel instance of a function.
> This is different from "a parallel instance only sees records of one key".
> 
> I had a look at the docs [1]. 
> I agree that "Logically partitions a stream into disjoint partitions, each partition containing elements of the same key." can be easily interpreted as you did.
> I've pushed a commit to clarify the description. The docs should be updated soon.
> 
> Best, Fabian 
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#datastream-transformations
> 
> 2018-04-05 6:21 GMT+02:00 Amit Jain <aj...@gmail.com>:
>> Hi,
>> 
>> KeyBy operation partition the data on given key and make sure same slot will
>> get all future data belonging to same key. In default implementation, it can
>> also map subset of keys in your DataStream to same slot.
>> 
>> Assuming you have number of keys equal to number running slot then you may
>> specify your custom keyBy operation to the achieve the same.
>> 
>> 
>> Could you specify your case.
>> 
>> --
>> Thanks
>> Amit
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 

Re: KeyedSream question

Posted by Fabian Hueske <fh...@gmail.com>.
Amit is correct. keyBy() ensures that all records with the same key are
processed by the same paralllel instance of a function.
This is different from "a parallel instance only sees records of one key".

I had a look at the docs [1].
I agree that "Logically partitions a stream into disjoint partitions, each
partition containing elements of the same key." can be easily interpreted
as you did.
I've pushed a commit to clarify the description. The docs should be updated
soon.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/
operators/#datastream-transformations

2018-04-05 6:21 GMT+02:00 Amit Jain <aj...@gmail.com>:

> Hi,
>
> KeyBy operation partition the data on given key and make sure same slot
> will
> get all future data belonging to same key. In default implementation, it
> can
> also map subset of keys in your DataStream to same slot.
>
> Assuming you have number of keys equal to number running slot then you may
> specify your custom keyBy operation to the achieve the same.
>
>
> Could you specify your case.
>
> --
> Thanks
> Amit
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: KeyedSream question

Posted by Amit Jain <aj...@gmail.com>.
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot. 

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same. 


Could you specify your case. 

--
Thanks 
Amit  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/