You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jürgen Thomann <ju...@innogames.com> on 2018/02/02 14:24:04 UTC

Getting Key from keyBy() in ProcessFunction

Hi,

I'm currently using a ProcessFunction after a keyBy() and can't find a 
way to get the key. I'm currently storing it in a ValueState<String> 
within processElement and set it all the time, so that I can access it 
in onTimer(). Is there a better way to get the key? We are using Flink 
1.3 at the moment.

Best,
Jürgen

Re: Getting Key from keyBy() in ProcessFunction

Posted by Piotr Nowojski <pi...@data-artisans.com>.
I think now it’s not easily possible, however it might be a valid suggestion to add `OnTimerContext#getCurrentKey()` method. 

Besides using ValueState as you discussed before, as a some kind of a walk around you could copy and modify KeyedProcessOperator to suits your needs, but this would be more complicated.

Piotrek

> On 4 Feb 2018, at 20:36, Ken Krugler <kk...@transpac.com> wrote:
> 
> Hi Jürgen,
> 
> That makes sense to me.
> 
> Anyone from the Flink team want to comment on (a) if there is a way to get the current key in the timer callback without using an explicit ValueState that’s maintained in the processElement() method, and (b) if not, whether that could be added to the context?
> 
> Thanks,
> 
> — Ken
> 
> 
>> On Feb 4, 2018, at 6:14 AM, Jürgen Thomann <juergen.thomann@innogames.com <ma...@innogames.com>> wrote:
>> 
>> Hi Ken,
>> 
>> thanks for your answer. You're right and I'm doing it already that way. I just hoped that I could avoid the ValueState (I'm using a MapState as well already, which does not store the key) and get the key from the provided Context of the ProcessFunction. This would avoid having the ValueState and setting it in the processElement just to know the key in the onTimer function. 
>> In the current way I have to check the ValueState for every element if the key is already set or just set it every time again the processElement method is invoked.
>> 
>> Best,
>> Jürgen
>> 
>> On 02.02.2018 18:37, Ken Krugler wrote:
>>> Hi Jürgen,
>>> 
>>>> On Feb 2, 2018, at 6:24 AM, Jürgen Thomann <juergen.thomann@innogames.com <ma...@innogames.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I'm currently using a ProcessFunction after a keyBy() and can't find a way to get the key.
>>> 
>>> Doesn’t your keyBy() take a field (position, or name) to use as the key?
>>> 
>>> So then that same field contains the key in the ProcessFunction.processElement(in, …) parameter, yes?
>>> 
>>>> I'm currently storing it in a ValueState<String> within processElement
>>> 
>>> If you’re using a ValueState, then there’s one of those for each unique key, not one for the operation.
>>> 
>>> I.e. the ValueState for key = “one” is separate from the ValueState for key = “two”.
>>> 
>>> You typically store the key in the state so it’s accessible in the onTimer method.
>>> 
>>>> and set it all the time, so that I can access it in onTimer(). Is there a better way to get the key? We are using Flink 1.3 at the moment.
>>> 
>>> The ValueState (what you used in processElement) that you’re accessing in the onTimer() method is also scoped by the current key.
>>> 
>>> So assuming you stored the key in the state inside of your processElement() call, then you should have everything you need.
>>> 
>>> — Ken
>>> 
>>> PS - Check out https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction <https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr


Re: Getting Key from keyBy() in ProcessFunction

Posted by Ken Krugler <kk...@transpac.com>.
Hi Jürgen,

That makes sense to me.

Anyone from the Flink team want to comment on (a) if there is a way to get the current key in the timer callback without using an explicit ValueState that’s maintained in the processElement() method, and (b) if not, whether that could be added to the context?

Thanks,

— Ken


> On Feb 4, 2018, at 6:14 AM, Jürgen Thomann <ju...@innogames.com> wrote:
> 
> Hi Ken,
> 
> thanks for your answer. You're right and I'm doing it already that way. I just hoped that I could avoid the ValueState (I'm using a MapState as well already, which does not store the key) and get the key from the provided Context of the ProcessFunction. This would avoid having the ValueState and setting it in the processElement just to know the key in the onTimer function. 
> In the current way I have to check the ValueState for every element if the key is already set or just set it every time again the processElement method is invoked.
> 
> Best,
> Jürgen
> 
> On 02.02.2018 18:37, Ken Krugler wrote:
>> Hi Jürgen,
>> 
>>> On Feb 2, 2018, at 6:24 AM, Jürgen Thomann <juergen.thomann@innogames.com <ma...@innogames.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I'm currently using a ProcessFunction after a keyBy() and can't find a way to get the key.
>> 
>> Doesn’t your keyBy() take a field (position, or name) to use as the key?
>> 
>> So then that same field contains the key in the ProcessFunction.processElement(in, …) parameter, yes?
>> 
>>> I'm currently storing it in a ValueState<String> within processElement
>> 
>> If you’re using a ValueState, then there’s one of those for each unique key, not one for the operation.
>> 
>> I.e. the ValueState for key = “one” is separate from the ValueState for key = “two”.
>> 
>> You typically store the key in the state so it’s accessible in the onTimer method.
>> 
>>> and set it all the time, so that I can access it in onTimer(). Is there a better way to get the key? We are using Flink 1.3 at the moment.
>> 
>> The ValueState (what you used in processElement) that you’re accessing in the onTimer() method is also scoped by the current key.
>> 
>> So assuming you stored the key in the state inside of your processElement() call, then you should have everything you need.
>> 
>> — Ken
>> 
>> PS - Check out https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction <https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction>
--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Re: Getting Key from keyBy() in ProcessFunction

Posted by Jürgen Thomann <ju...@innogames.com>.
Hi Ken,

thanks for your answer. You're right and I'm doing it already that way. 
I just hoped that I could avoid the ValueState (I'm using a MapState as 
well already, which does not store the key) and get the key from the 
provided Context of the ProcessFunction. This would avoid having the 
ValueState and setting it in the processElement just to know the key in 
the onTimer function.

In the current way I have to check the ValueState for every element if 
the key is already set or just set it every time again the 
processElement method is invoked.

Best,
Jürgen


On 02.02.2018 18:37, Ken Krugler wrote:
> Hi Jürgen,
>
>> On Feb 2, 2018, at 6:24 AM, Jürgen Thomann 
>> <juergen.thomann@innogames.com 
>> <ma...@innogames.com>> wrote:
>>
>> Hi,
>>
>> I'm currently using a ProcessFunction after a keyBy() and can't find 
>> a way to get the key.
>
> Doesn’t your keyBy() take a field (position, or name) to use as the key?
>
> So then that same field contains the key in the 
> ProcessFunction.processElement(in, …) parameter, yes?
>
>> I'm currently storing it in a ValueState<String> within processElement
>
> If you’re using a ValueState, then there’s one of those for each 
> unique key, not one for the operation.
>
> I.e. the ValueState for key = “one” is separate from the ValueState 
> for key = “two”.
>
> You typically store the key in the state so it’s accessible in the 
> onTimer method.
>
>> and set it all the time, so that I can access it in onTimer(). Is 
>> there a better way to get the key? We are using Flink 1.3 at the moment.
>
> The ValueState (what you used in processElement) that you’re accessing 
> in the onTimer() method is also scoped by the current key.
>
> So assuming you stored the key in the state inside of your 
> processElement() call, then you should have everything you need.
>
> — Ken
>
> PS - Check out 
> https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>

Re: Getting Key from keyBy() in ProcessFunction

Posted by Ken Krugler <kk...@transpac.com>.
Hi Jürgen,

> On Feb 2, 2018, at 6:24 AM, Jürgen Thomann <juergen.thomann@innogames.com <ma...@innogames.com>> wrote:
> 
> Hi,
> 
> I'm currently using a ProcessFunction after a keyBy() and can't find a way to get the key.

Doesn’t your keyBy() take a field (position, or name) to use as the key?

So then that same field contains the key in the ProcessFunction.processElement(in, …) parameter, yes?

> I'm currently storing it in a ValueState<String> within processElement

If you’re using a ValueState, then there’s one of those for each unique key, not one for the operation.

I.e. the ValueState for key = “one” is separate from the ValueState for key = “two”.

You typically store the key in the state so it’s accessible in the onTimer method.

> and set it all the time, so that I can access it in onTimer(). Is there a better way to get the key? We are using Flink 1.3 at the moment.

The ValueState (what you used in processElement) that you’re accessing in the onTimer() method is also scoped by the current key.

So assuming you stored the key in the state inside of your processElement() call, then you should have everything you need.

— Ken

PS - Check out https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction <https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction>

--------------------------
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr