You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joe Olson <te...@nododos.com> on 2017/01/23 13:46:01 UTC

Queryable State and Windows

From what I've read in the documentation, and from the examples I've seen, in order to make state queryable externally to Flink, the state descriptor variables need access to the Flink runtime context. 

This means the stream processor has to have access to the 'Rich' level objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state examples I have seen revolve around RichFlatMap. 

Is there a way to get the runtime context exposed so that you can have state descriptor variables queryable from within a Flink window, while the window is loading? 

My processor is built around the following: 

.addSource(new FlinkKafkaConsumer010()) 
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) 
.keyBy() 
.window(GlobalWindows.create()) 
.trigger(new myTrigger()) 
.apply(new myWindowFunction()) 
.addSink(new mySink()) 

The only rich object in this chain are available in the apply (RichWindowFunction). But that is too late - I want to be able to query out whats in the window while it is filling. I know I have access to onElement in the trigger, and I can set up the state descriptor variables there, but the variables don't seem to have exposure to the runtime environment within the trigger. 

Is there a way to get queryable state within a Flink window while it is filling? 


Re: Queryable State and Windows

Posted by Konstantin Knauf <ko...@tngtech.com>.
I just want to add another workaround, which does not need a
self-compiled version. You can use TimeWindow with a CountTriger.of(1)
combined with a FoldFunction for pre-aggregration and a
RichWindowFunction to update the queryable state. Additionally, you need
a TimeWindow for the final results. So you are doubling the amount of
state as well as computation, but depending on the circumstances this
might be preferrable than tweaking Flink 1.2.

I think, Jamie Grier did this similarly in one of his presentation on
the topic.

Cheers,

Konstantin

On 23.01.2017 15:39, Ufuk Celebi wrote:
> This is not possible at the moment. We discussed this a couple of
> times before, but in the end did not want to expose it with the
> initial version, because the interfaces are still very raw. This is
> definitely on the agenda though.
> 
> As a work around you would have to build a custom Flink version with
> calls `setQueryable` on the state descriptors of the WindowOperator.
> If there is an easy non intrusive way to activate this for the
> upcoming 1.2 version, I will try to do it.
> 
> 
> 
> On Mon, Jan 23, 2017 at 2:46 PM, Joe Olson <te...@nododos.com> wrote:
>> From what I've read in the documentation, and from the examples I've seen,
>> in order to make state queryable externally to Flink, the state descriptor
>> variables need access to the Flink runtime context.
>>
>> This means the stream processor has to have access to the 'Rich' level
>> objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state
>> examples I have seen revolve around RichFlatMap.
>>
>> Is there a way to get the runtime context exposed so that you can have state
>> descriptor variables queryable from within a Flink window, while the window
>> is loading?
>>
>> My processor is built around the following:
>>
>> .addSource(new FlinkKafkaConsumer010())
>> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>> .keyBy()
>> .window(GlobalWindows.create())
>> .trigger(new myTrigger())
>> .apply(new myWindowFunction())
>> .addSink(new mySink())
>>
>> The only rich object in this chain are available in the apply
>> (RichWindowFunction). But that is too late - I want to be able to query out
>> whats in the window while it is filling. I know I have access to onElement
>> in the trigger, and I can set up the state descriptor variables there, but
>> the variables don't seem to have exposure to the runtime environment within
>> the trigger.
>>
>> Is there a way to get queryable state within a Flink window while it is
>> filling?
>>
> 

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Queryable State and Windows

Posted by Ufuk Celebi <uc...@apache.org>.
This is not possible at the moment. We discussed this a couple of
times before, but in the end did not want to expose it with the
initial version, because the interfaces are still very raw. This is
definitely on the agenda though.

As a work around you would have to build a custom Flink version with
calls `setQueryable` on the state descriptors of the WindowOperator.
If there is an easy non intrusive way to activate this for the
upcoming 1.2 version, I will try to do it.



On Mon, Jan 23, 2017 at 2:46 PM, Joe Olson <te...@nododos.com> wrote:
> From what I've read in the documentation, and from the examples I've seen,
> in order to make state queryable externally to Flink, the state descriptor
> variables need access to the Flink runtime context.
>
> This means the stream processor has to have access to the 'Rich' level
> objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state
> examples I have seen revolve around RichFlatMap.
>
> Is there a way to get the runtime context exposed so that you can have state
> descriptor variables queryable from within a Flink window, while the window
> is loading?
>
> My processor is built around the following:
>
> .addSource(new FlinkKafkaConsumer010())
> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> .keyBy()
> .window(GlobalWindows.create())
> .trigger(new myTrigger())
> .apply(new myWindowFunction())
> .addSink(new mySink())
>
> The only rich object in this chain are available in the apply
> (RichWindowFunction). But that is too late - I want to be able to query out
> whats in the window while it is filling. I know I have access to onElement
> in the trigger, and I can set up the state descriptor variables there, but
> the variables don't seem to have exposure to the runtime environment within
> the trigger.
>
> Is there a way to get queryable state within a Flink window while it is
> filling?
>