You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2017/12/01 18:38:13 UTC

aggregate does not allow RichAggregateFunction ?

I have a simple Aggregation with one caveat. For some reason I have to keep
a large amount of state till the window is GCed. The state is within the
Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
offload the state  to the states backend ( ROCKSDB), keeping the between
checkpoint state in memory ( seems to be an obvious fix). I am not though
allowed to have a RichAggregateFunction in the aggregate method of a
windowed stream . That begs 2 questions

1. Why
2. Is there an alternative for stateful window aggregation where we manage
the state. ?

Thanks Vishal


Here is the code ( generics but it works  )

SingleOutputStreamOperator<OUT> retVal = input
        .keyBy(keySelector)
        .window(EventTimeSessionWindows.withGap(gap))
        .aggregate(
                new AggregateFunction<IN, ACC, OUT>() {

                    @Override
                    public ACC createAccumulator() {
                        ACC newInstance = (ACC) accumulator.clone();
                        newInstance.resetLocal();
                        return newInstance;
                    }

                    @Override
                    public void add(IN value, ACC accumulator) {
                        accumulator.add(value);

                    }

                    @Override
                    public OUT getResult(ACC accumulator) {
                        return accumulator.getLocalValue();
                    }

                    @Override
                    public ACC merge(ACC a, ACC b) {
                        a.merge(b);
                        return a;
                    }
                }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
                    @Override
                    public void apply(KEY s, TimeWindow window,
Iterable<OUT> input, Collector<OUT> out) throws Exception {
                            out.collect(input.iterator().next());
                    }
                }, accType, aggregationResultType, aggregationResultType);

Re: aggregate does not allow RichAggregateFunction ?

Posted by chiggi_dev <ch...@yahoo.in>.
Hi Fabian,

We came across this issue while working on RichAggregateFunction. Isnt
generic state mergeable, similar to ACC merge?  

What if I need the Flink classLoader in the Aggregate function? Is there
anyway I can do that without RuntimeContext?

Thanks,

Chirag 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: aggregate does not allow RichAggregateFunction ?

Posted by Vishal Santoshi <vi...@gmail.com>.
Hello Fabian,
                      We decided that it does not make sense to create
partitioned kakka partitions b'coz of hot spot considerations. So we
created a way to keep trimmed state in the Accumulator provided  we know
the current  watermark to keep the trimmed state time correct.  In essence
the paths we look for in a sequence of events in a session are eagerly
materialized and emitted using a periodic CountTrigger followed by
truncation of the state.

                 It requires us to know current watermark in the e
Accumulator ?  We do have the watermark in Trigger's onElement(),
onEventTime()  and onProcessingTime() through the TriggerContext , but I
see no way to pass it on to the Accumulator. A lazy setting of WM on the
element, which we thought was a shared instance between invocation of add()
on Accumulator and onElement() on the attached Trigger, does not seem to
work in a distributed environment.

I tried the ProcessWindowFunction too. It was promising as it's process
method has the Context and thus the WM, but it too suffers from the same
issue when using WindowState ( state keyed to window and key ) in session
window throwing

java.lang.UnsupportedOperationException: Per-window state is not
allowed when using merging windows.
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$MergingWindowStateStore.getState(WindowOperator.java:720)


Vishal



On Mon, Dec 11, 2017 at 10:13 AM, Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:

> Perfect, f in our use case, the kafka partition  key and the keyBy use the
> same exact field and thus the order will be preserved.
>
> On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> the order or records that are sent from one task to another task is
>> preserved (task refers to the parallel instance of an operator).
>> However, a task that receives records from multiple input tasks, consumes
>> records from its inputs in arbitrary order.
>>
>> If a job reads from a partitioned Kafka topic and does a keyBy on the
>> partitioning key of the Kafka topic, an operator task that follows the
>> keyBy consumes all records with the same key from exactly one input task
>> (the one reading the Kafka partition for the key).
>> However, since Flink's and Kafka's partitioning functions are not the
>> same, records from the same partition with different keys can be sent to
>> different tasks.
>>
>> So:
>> 1) Records from the same partition might not be processed by the same
>> operator (and hence not in order).
>> 2) Records with the same key are processed by the same operator in the
>> same order in which they were read from the partition.
>>
>> Best,
>> Fabian
>>
>> 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> An additional question is that if the source is key partitioned  ( kafka
>>> ) does a keyBy retain the order of  a kafka partirion across a shuffle ?
>>>
>>> On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> I understand that. Let me elaborate. The sequence of events is
>>>>
>>>> 1. Round robin dispatch to kafka cluster  ( it is not partitioned on
>>>> the key which we may ultimately do  and than I will have more questions on
>>>> how to key y and still keep order, pbly avoid shuffle :) ) .
>>>> 2. key by a high cardinality key
>>>> 3. Sessionize
>>>> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a
>>>> subsequent key by ), the sort order is not retained and the ACC has to hold
>>>> on to the elements in a List . When the Window is finalized  we sort the in
>>>> ACC  List  and do pagination, We are looking for paths within a session
>>>> from . a source to a sink event based. I was hoping to use ROCKS DB state
>>>> as a final merged list and thus off heap and  use a Count based Trigger to
>>>> evaluate the ACC and merge the inter Trigger collection to the  master copy
>>>> rather than keeping all events in the ACC ( I would imagine a very general
>>>> pattern to use ).
>>>>
>>>> Does that make sense ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> If you use an AggregatingFunction in this way (i.e. for a window) the
>>>>> ACC should in fact be kept in the state backend. Did you configure the job
>>>>> to use RocksDB? How are the memory problems manifesting?
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 6. Dec 2017, at 14:57, Fabian Hueske <fh...@gmail.com> wrote:
>>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> you are right, it is not possible to use state in an AggregateFunction
>>>>> because windows need to be mergeable.
>>>>> An AggregateFunction knows how to merge its accumulators but merging
>>>>> generic state is not possible.
>>>>>
>>>>> I am not aware of an efficient and easy work around for this.
>>>>> If you want to use the provided session window logic, you can use a
>>>>> WindowFunction that performs all computations when the window is triggered.
>>>>> This means that aggregations do not happen eagerly and all events for a
>>>>> window are collected and held in state.
>>>>> Another approach could be to implement the whole logic (incl. the
>>>>> session windowing) using a ProcessFunction. This would be a major effort
>>>>> though.
>>>>>
>>>>> Best,
>>>>> Fabian
>>>>>
>>>>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>>
>>>>>> It seems that this has to do with session windows tbat are mergeable
>>>>>> ? I tried the RixhWindow function and that seems to suggest that one cannot
>>>>>> use state ? Any ideas folks...
>>>>>>
>>>>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a simple Aggregation with one caveat. For some reason I have
>>>>>>> to keep a large amount of state till the window is GCed. The state is
>>>>>>> within the Accumulator ( ACC ). I am hitting a memory bottleneck and would
>>>>>>> like to offload the state  to the states backend ( ROCKSDB), keeping the
>>>>>>> between checkpoint state in memory ( seems to be an obvious fix). I am not
>>>>>>> though allowed to have a RichAggregateFunction in the aggregate method of a
>>>>>>> windowed stream . That begs 2 questions
>>>>>>>
>>>>>>> 1. Why
>>>>>>> 2. Is there an alternative for stateful window aggregation where we
>>>>>>> manage the state. ?
>>>>>>>
>>>>>>> Thanks Vishal
>>>>>>>
>>>>>>>
>>>>>>> Here is the code ( generics but it works  )
>>>>>>>
>>>>>>> SingleOutputStreamOperator<OUT> retVal = input
>>>>>>>         .keyBy(keySelector)
>>>>>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>>>>>         .aggregate(
>>>>>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public ACC createAccumulator() {
>>>>>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>>>>>                         newInstance.resetLocal();
>>>>>>>                         return newInstance;
>>>>>>>                     }
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public void add(IN value, ACC accumulator) {
>>>>>>>                         accumulator.add(value);
>>>>>>>
>>>>>>>                     }
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public OUT getResult(ACC accumulator) {
>>>>>>>                         return accumulator.getLocalValue();
>>>>>>>                     }
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public ACC merge(ACC a, ACC b) {
>>>>>>>                         a.merge(b);
>>>>>>>                         return a;
>>>>>>>                     }
>>>>>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>>>>>                     @Override
>>>>>>>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>>>>>                             out.collect(input.iterator().next());
>>>>>>>                     }
>>>>>>>                 }, accType, aggregationResultType, aggregationResultType);
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: aggregate does not allow RichAggregateFunction ?

Posted by Vishal Santoshi <vi...@gmail.com>.
Perfect, f in our use case, the kafka partition  key and the keyBy use the
same exact field and thus the order will be preserved.

On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> the order or records that are sent from one task to another task is
> preserved (task refers to the parallel instance of an operator).
> However, a task that receives records from multiple input tasks, consumes
> records from its inputs in arbitrary order.
>
> If a job reads from a partitioned Kafka topic and does a keyBy on the
> partitioning key of the Kafka topic, an operator task that follows the
> keyBy consumes all records with the same key from exactly one input task
> (the one reading the Kafka partition for the key).
> However, since Flink's and Kafka's partitioning functions are not the
> same, records from the same partition with different keys can be sent to
> different tasks.
>
> So:
> 1) Records from the same partition might not be processed by the same
> operator (and hence not in order).
> 2) Records with the same key are processed by the same operator in the
> same order in which they were read from the partition.
>
> Best,
> Fabian
>
> 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> An additional question is that if the source is key partitioned  ( kafka
>> ) does a keyBy retain the order of  a kafka partirion across a shuffle ?
>>
>> On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> I understand that. Let me elaborate. The sequence of events is
>>>
>>> 1. Round robin dispatch to kafka cluster  ( it is not partitioned on the
>>> key which we may ultimately do  and than I will have more questions on how
>>> to key y and still keep order, pbly avoid shuffle :) ) .
>>> 2. key by a high cardinality key
>>> 3. Sessionize
>>> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a
>>> subsequent key by ), the sort order is not retained and the ACC has to hold
>>> on to the elements in a List . When the Window is finalized  we sort the in
>>> ACC  List  and do pagination, We are looking for paths within a session
>>> from . a source to a sink event based. I was hoping to use ROCKS DB state
>>> as a final merged list and thus off heap and  use a Count based Trigger to
>>> evaluate the ACC and merge the inter Trigger collection to the  master copy
>>> rather than keeping all events in the ACC ( I would imagine a very general
>>> pattern to use ).
>>>
>>> Does that make sense ?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> If you use an AggregatingFunction in this way (i.e. for a window) the
>>>> ACC should in fact be kept in the state backend. Did you configure the job
>>>> to use RocksDB? How are the memory problems manifesting?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 6. Dec 2017, at 14:57, Fabian Hueske <fh...@gmail.com> wrote:
>>>>
>>>> Hi Vishal,
>>>>
>>>> you are right, it is not possible to use state in an AggregateFunction
>>>> because windows need to be mergeable.
>>>> An AggregateFunction knows how to merge its accumulators but merging
>>>> generic state is not possible.
>>>>
>>>> I am not aware of an efficient and easy work around for this.
>>>> If you want to use the provided session window logic, you can use a
>>>> WindowFunction that performs all computations when the window is triggered.
>>>> This means that aggregations do not happen eagerly and all events for a
>>>> window are collected and held in state.
>>>> Another approach could be to implement the whole logic (incl. the
>>>> session windowing) using a ProcessFunction. This would be a major effort
>>>> though.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>>
>>>>> It seems that this has to do with session windows tbat are mergeable ?
>>>>> I tried the RixhWindow function and that seems to suggest that one cannot
>>>>> use state ? Any ideas folks...
>>>>>
>>>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have a simple Aggregation with one caveat. For some reason I have
>>>>>> to keep a large amount of state till the window is GCed. The state is
>>>>>> within the Accumulator ( ACC ). I am hitting a memory bottleneck and would
>>>>>> like to offload the state  to the states backend ( ROCKSDB), keeping the
>>>>>> between checkpoint state in memory ( seems to be an obvious fix). I am not
>>>>>> though allowed to have a RichAggregateFunction in the aggregate method of a
>>>>>> windowed stream . That begs 2 questions
>>>>>>
>>>>>> 1. Why
>>>>>> 2. Is there an alternative for stateful window aggregation where we
>>>>>> manage the state. ?
>>>>>>
>>>>>> Thanks Vishal
>>>>>>
>>>>>>
>>>>>> Here is the code ( generics but it works  )
>>>>>>
>>>>>> SingleOutputStreamOperator<OUT> retVal = input
>>>>>>         .keyBy(keySelector)
>>>>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>>>>         .aggregate(
>>>>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>>>>
>>>>>>                     @Override
>>>>>>                     public ACC createAccumulator() {
>>>>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>>>>                         newInstance.resetLocal();
>>>>>>                         return newInstance;
>>>>>>                     }
>>>>>>
>>>>>>                     @Override
>>>>>>                     public void add(IN value, ACC accumulator) {
>>>>>>                         accumulator.add(value);
>>>>>>
>>>>>>                     }
>>>>>>
>>>>>>                     @Override
>>>>>>                     public OUT getResult(ACC accumulator) {
>>>>>>                         return accumulator.getLocalValue();
>>>>>>                     }
>>>>>>
>>>>>>                     @Override
>>>>>>                     public ACC merge(ACC a, ACC b) {
>>>>>>                         a.merge(b);
>>>>>>                         return a;
>>>>>>                     }
>>>>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>>>>                     @Override
>>>>>>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>>>>                             out.collect(input.iterator().next());
>>>>>>                     }
>>>>>>                 }, accType, aggregationResultType, aggregationResultType);
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
>

Re: aggregate does not allow RichAggregateFunction ?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

the order or records that are sent from one task to another task is
preserved (task refers to the parallel instance of an operator).
However, a task that receives records from multiple input tasks, consumes
records from its inputs in arbitrary order.

If a job reads from a partitioned Kafka topic and does a keyBy on the
partitioning key of the Kafka topic, an operator task that follows the
keyBy consumes all records with the same key from exactly one input task
(the one reading the Kafka partition for the key).
However, since Flink's and Kafka's partitioning functions are not the same,
records from the same partition with different keys can be sent to
different tasks.

So:
1) Records from the same partition might not be processed by the same
operator (and hence not in order).
2) Records with the same key are processed by the same operator in the same
order in which they were read from the partition.

Best,
Fabian

2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> An additional question is that if the source is key partitioned  ( kafka )
> does a keyBy retain the order of  a kafka partirion across a shuffle ?
>
> On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <vishal.santoshi@gmail.com
> > wrote:
>
>> I understand that. Let me elaborate. The sequence of events is
>>
>> 1. Round robin dispatch to kafka cluster  ( it is not partitioned on the
>> key which we may ultimately do  and than I will have more questions on how
>> to key y and still keep order, pbly avoid shuffle :) ) .
>> 2. key by a high cardinality key
>> 3. Sessionize
>> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a
>> subsequent key by ), the sort order is not retained and the ACC has to hold
>> on to the elements in a List . When the Window is finalized  we sort the in
>> ACC  List  and do pagination, We are looking for paths within a session
>> from . a source to a sink event based. I was hoping to use ROCKS DB state
>> as a final merged list and thus off heap and  use a Count based Trigger to
>> evaluate the ACC and merge the inter Trigger collection to the  master copy
>> rather than keeping all events in the ACC ( I would imagine a very general
>> pattern to use ).
>>
>> Does that make sense ?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> If you use an AggregatingFunction in this way (i.e. for a window) the
>>> ACC should in fact be kept in the state backend. Did you configure the job
>>> to use RocksDB? How are the memory problems manifesting?
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 6. Dec 2017, at 14:57, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>> Hi Vishal,
>>>
>>> you are right, it is not possible to use state in an AggregateFunction
>>> because windows need to be mergeable.
>>> An AggregateFunction knows how to merge its accumulators but merging
>>> generic state is not possible.
>>>
>>> I am not aware of an efficient and easy work around for this.
>>> If you want to use the provided session window logic, you can use a
>>> WindowFunction that performs all computations when the window is triggered.
>>> This means that aggregations do not happen eagerly and all events for a
>>> window are collected and held in state.
>>> Another approach could be to implement the whole logic (incl. the
>>> session windowing) using a ProcessFunction. This would be a major effort
>>> though.
>>>
>>> Best,
>>> Fabian
>>>
>>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>>> It seems that this has to do with session windows tbat are mergeable ?
>>>> I tried the RixhWindow function and that seems to suggest that one cannot
>>>> use state ? Any ideas folks...
>>>>
>>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vi...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have a simple Aggregation with one caveat. For some reason I have to
>>>>> keep a large amount of state till the window is GCed. The state is within
>>>>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
>>>>> offload the state  to the states backend ( ROCKSDB), keeping the between
>>>>> checkpoint state in memory ( seems to be an obvious fix). I am not though
>>>>> allowed to have a RichAggregateFunction in the aggregate method of a
>>>>> windowed stream . That begs 2 questions
>>>>>
>>>>> 1. Why
>>>>> 2. Is there an alternative for stateful window aggregation where we
>>>>> manage the state. ?
>>>>>
>>>>> Thanks Vishal
>>>>>
>>>>>
>>>>> Here is the code ( generics but it works  )
>>>>>
>>>>> SingleOutputStreamOperator<OUT> retVal = input
>>>>>         .keyBy(keySelector)
>>>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>>>         .aggregate(
>>>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>>>
>>>>>                     @Override
>>>>>                     public ACC createAccumulator() {
>>>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>>>                         newInstance.resetLocal();
>>>>>                         return newInstance;
>>>>>                     }
>>>>>
>>>>>                     @Override
>>>>>                     public void add(IN value, ACC accumulator) {
>>>>>                         accumulator.add(value);
>>>>>
>>>>>                     }
>>>>>
>>>>>                     @Override
>>>>>                     public OUT getResult(ACC accumulator) {
>>>>>                         return accumulator.getLocalValue();
>>>>>                     }
>>>>>
>>>>>                     @Override
>>>>>                     public ACC merge(ACC a, ACC b) {
>>>>>                         a.merge(b);
>>>>>                         return a;
>>>>>                     }
>>>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>>>                     @Override
>>>>>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>>>                             out.collect(input.iterator().next());
>>>>>                     }
>>>>>                 }, accType, aggregationResultType, aggregationResultType);
>>>>>
>>>>>
>>>
>>>
>>
>

Re: aggregate does not allow RichAggregateFunction ?

Posted by Vishal Santoshi <vi...@gmail.com>.
An additional question is that if the source is key partitioned  ( kafka )
does a keyBy retain the order of  a kafka partirion across a shuffle ?

On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> I understand that. Let me elaborate. The sequence of events is
>
> 1. Round robin dispatch to kafka cluster  ( it is not partitioned on the
> key which we may ultimately do  and than I will have more questions on how
> to key y and still keep order, pbly avoid shuffle :) ) .
> 2. key by a high cardinality key
> 3. Sessionize
> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a
> subsequent key by ), the sort order is not retained and the ACC has to hold
> on to the elements in a List . When the Window is finalized  we sort the in
> ACC  List  and do pagination, We are looking for paths within a session
> from . a source to a sink event based. I was hoping to use ROCKS DB state
> as a final merged list and thus off heap and  use a Count based Trigger to
> evaluate the ACC and merge the inter Trigger collection to the  master copy
> rather than keeping all events in the ACC ( I would imagine a very general
> pattern to use ).
>
> Does that make sense ?
>
>
>
>
>
>
>
>
>
>
> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> If you use an AggregatingFunction in this way (i.e. for a window) the ACC
>> should in fact be kept in the state backend. Did you configure the job to
>> use RocksDB? How are the memory problems manifesting?
>>
>> Best,
>> Aljoscha
>>
>>
>> On 6. Dec 2017, at 14:57, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi Vishal,
>>
>> you are right, it is not possible to use state in an AggregateFunction
>> because windows need to be mergeable.
>> An AggregateFunction knows how to merge its accumulators but merging
>> generic state is not possible.
>>
>> I am not aware of an efficient and easy work around for this.
>> If you want to use the provided session window logic, you can use a
>> WindowFunction that performs all computations when the window is triggered.
>> This means that aggregations do not happen eagerly and all events for a
>> window are collected and held in state.
>> Another approach could be to implement the whole logic (incl. the session
>> windowing) using a ProcessFunction. This would be a major effort though.
>>
>> Best,
>> Fabian
>>
>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> It seems that this has to do with session windows tbat are mergeable ? I
>>> tried the RixhWindow function and that seems to suggest that one cannot use
>>> state ? Any ideas folks...
>>>
>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vi...@gmail.com>
>>> wrote:
>>>
>>>> I have a simple Aggregation with one caveat. For some reason I have to
>>>> keep a large amount of state till the window is GCed. The state is within
>>>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
>>>> offload the state  to the states backend ( ROCKSDB), keeping the between
>>>> checkpoint state in memory ( seems to be an obvious fix). I am not though
>>>> allowed to have a RichAggregateFunction in the aggregate method of a
>>>> windowed stream . That begs 2 questions
>>>>
>>>> 1. Why
>>>> 2. Is there an alternative for stateful window aggregation where we
>>>> manage the state. ?
>>>>
>>>> Thanks Vishal
>>>>
>>>>
>>>> Here is the code ( generics but it works  )
>>>>
>>>> SingleOutputStreamOperator<OUT> retVal = input
>>>>         .keyBy(keySelector)
>>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>>         .aggregate(
>>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>>
>>>>                     @Override
>>>>                     public ACC createAccumulator() {
>>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>>                         newInstance.resetLocal();
>>>>                         return newInstance;
>>>>                     }
>>>>
>>>>                     @Override
>>>>                     public void add(IN value, ACC accumulator) {
>>>>                         accumulator.add(value);
>>>>
>>>>                     }
>>>>
>>>>                     @Override
>>>>                     public OUT getResult(ACC accumulator) {
>>>>                         return accumulator.getLocalValue();
>>>>                     }
>>>>
>>>>                     @Override
>>>>                     public ACC merge(ACC a, ACC b) {
>>>>                         a.merge(b);
>>>>                         return a;
>>>>                     }
>>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>>                     @Override
>>>>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>>                             out.collect(input.iterator().next());
>>>>                     }
>>>>                 }, accType, aggregationResultType, aggregationResultType);
>>>>
>>>>
>>
>>
>

Re: aggregate does not allow RichAggregateFunction ?

Posted by Vishal Santoshi <vi...@gmail.com>.
I understand that. Let me elaborate. The sequence of events is

1. Round robin dispatch to kafka cluster  ( it is not partitioned on the
key which we may ultimately do  and than I will have more questions on how
to key y and still keep order, pbly avoid shuffle :) ) .
2. key by a high cardinality key
3. Sessionize
4. B'coz of the RR on kafka ( and even if partitioned on the key and a
subsequent key by ), the sort order is not retained and the ACC has to hold
on to the elements in a List . When the Window is finalized  we sort the in
ACC  List  and do pagination, We are looking for paths within a session
from . a source to a sink event based. I was hoping to use ROCKS DB state
as a final merged list and thus off heap and  use a Count based Trigger to
evaluate the ACC and merge the inter Trigger collection to the  master copy
rather than keeping all events in the ACC ( I would imagine a very general
pattern to use ).

Does that make sense ?










On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> If you use an AggregatingFunction in this way (i.e. for a window) the ACC
> should in fact be kept in the state backend. Did you configure the job to
> use RocksDB? How are the memory problems manifesting?
>
> Best,
> Aljoscha
>
>
> On 6. Dec 2017, at 14:57, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Vishal,
>
> you are right, it is not possible to use state in an AggregateFunction
> because windows need to be mergeable.
> An AggregateFunction knows how to merge its accumulators but merging
> generic state is not possible.
>
> I am not aware of an efficient and easy work around for this.
> If you want to use the provided session window logic, you can use a
> WindowFunction that performs all computations when the window is triggered.
> This means that aggregations do not happen eagerly and all events for a
> window are collected and held in state.
> Another approach could be to implement the whole logic (incl. the session
> windowing) using a ProcessFunction. This would be a major effort though.
>
> Best,
> Fabian
>
> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:
>
>> It seems that this has to do with session windows tbat are mergeable ? I
>> tried the RixhWindow function and that seems to suggest that one cannot use
>> state ? Any ideas folks...
>>
>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vi...@gmail.com>
>> wrote:
>>
>>> I have a simple Aggregation with one caveat. For some reason I have to
>>> keep a large amount of state till the window is GCed. The state is within
>>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
>>> offload the state  to the states backend ( ROCKSDB), keeping the between
>>> checkpoint state in memory ( seems to be an obvious fix). I am not though
>>> allowed to have a RichAggregateFunction in the aggregate method of a
>>> windowed stream . That begs 2 questions
>>>
>>> 1. Why
>>> 2. Is there an alternative for stateful window aggregation where we
>>> manage the state. ?
>>>
>>> Thanks Vishal
>>>
>>>
>>> Here is the code ( generics but it works  )
>>>
>>> SingleOutputStreamOperator<OUT> retVal = input
>>>         .keyBy(keySelector)
>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>         .aggregate(
>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>
>>>                     @Override
>>>                     public ACC createAccumulator() {
>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>                         newInstance.resetLocal();
>>>                         return newInstance;
>>>                     }
>>>
>>>                     @Override
>>>                     public void add(IN value, ACC accumulator) {
>>>                         accumulator.add(value);
>>>
>>>                     }
>>>
>>>                     @Override
>>>                     public OUT getResult(ACC accumulator) {
>>>                         return accumulator.getLocalValue();
>>>                     }
>>>
>>>                     @Override
>>>                     public ACC merge(ACC a, ACC b) {
>>>                         a.merge(b);
>>>                         return a;
>>>                     }
>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>                     @Override
>>>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>                             out.collect(input.iterator().next());
>>>                     }
>>>                 }, accType, aggregationResultType, aggregationResultType);
>>>
>>>
>
>

Re: aggregate does not allow RichAggregateFunction ?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?

Best,
Aljoscha

> On 6. Dec 2017, at 14:57, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Vishal,
> 
> you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable. 
> An AggregateFunction knows how to merge its accumulators but merging generic state is not possible. 
> 
> I am not aware of an efficient and easy work around for this. 
> If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
> Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.
> 
> Best,
> Fabian
> 
> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com <ma...@gmail.com>>:
> It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...
> 
> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santoshi@gmail.com <ma...@gmail.com>> wrote:
> I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  
> 
> 1. Why 
> 2. Is there an alternative for stateful window aggregation where we manage the state. ?
> 
> Thanks Vishal
> 
> 
> Here is the code ( generics but it works  ) 
> SingleOutputStreamOperator<OUT> retVal = input
>         .keyBy(keySelector)
>         .window(EventTimeSessionWindows.withGap(gap))
>         .aggregate(
>                 new AggregateFunction<IN, ACC, OUT>() {
> 
>                     @Override
>                     public ACC createAccumulator() {
>                         ACC newInstance = (ACC) accumulator.clone();
>                         newInstance.resetLocal();
>                         return newInstance;
>                     }
> 
>                     @Override
>                     public void add(IN value, ACC accumulator) {
>                         accumulator.add(value);
> 
>                     }
> 
>                     @Override
>                     public OUT getResult(ACC accumulator) {
>                         return accumulator.getLocalValue();
>                     }
> 
>                     @Override
>                     public ACC merge(ACC a, ACC b) {
>                         a.merge(b);
>                         return a;
>                     }
>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>                     @Override
>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>                             out.collect(input.iterator().next());
>                     }
>                 }, accType, aggregationResultType, aggregationResultType);
> 


Re: aggregate does not allow RichAggregateFunction ?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction
because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging
generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a
WindowFunction that performs all computations when the window is triggered.
This means that aggregations do not happen eagerly and all events for a
window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session
windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vi...@gmail.com>:

> It seems that this has to do with session windows tbat are mergeable ? I
> tried the RixhWindow function and that seems to suggest that one cannot use
> state ? Any ideas folks...
>
> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vi...@gmail.com>
> wrote:
>
>> I have a simple Aggregation with one caveat. For some reason I have to
>> keep a large amount of state till the window is GCed. The state is within
>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
>> offload the state  to the states backend ( ROCKSDB), keeping the between
>> checkpoint state in memory ( seems to be an obvious fix). I am not though
>> allowed to have a RichAggregateFunction in the aggregate method of a
>> windowed stream . That begs 2 questions
>>
>> 1. Why
>> 2. Is there an alternative for stateful window aggregation where we
>> manage the state. ?
>>
>> Thanks Vishal
>>
>>
>> Here is the code ( generics but it works  )
>>
>> SingleOutputStreamOperator<OUT> retVal = input
>>         .keyBy(keySelector)
>>         .window(EventTimeSessionWindows.withGap(gap))
>>         .aggregate(
>>                 new AggregateFunction<IN, ACC, OUT>() {
>>
>>                     @Override
>>                     public ACC createAccumulator() {
>>                         ACC newInstance = (ACC) accumulator.clone();
>>                         newInstance.resetLocal();
>>                         return newInstance;
>>                     }
>>
>>                     @Override
>>                     public void add(IN value, ACC accumulator) {
>>                         accumulator.add(value);
>>
>>                     }
>>
>>                     @Override
>>                     public OUT getResult(ACC accumulator) {
>>                         return accumulator.getLocalValue();
>>                     }
>>
>>                     @Override
>>                     public ACC merge(ACC a, ACC b) {
>>                         a.merge(b);
>>                         return a;
>>                     }
>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>                     @Override
>>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>                             out.collect(input.iterator().next());
>>                     }
>>                 }, accType, aggregationResultType, aggregationResultType);
>>
>>

Re: aggregate does not allow RichAggregateFunction ?

Posted by Vishal Santoshi <vi...@gmail.com>.
It seems that this has to do with session windows tbat are mergeable ? I
tried the RixhWindow function and that seems to suggest that one cannot use
state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vi...@gmail.com>
wrote:

> I have a simple Aggregation with one caveat. For some reason I have to
> keep a large amount of state till the window is GCed. The state is within
> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
> offload the state  to the states backend ( ROCKSDB), keeping the between
> checkpoint state in memory ( seems to be an obvious fix). I am not though
> allowed to have a RichAggregateFunction in the aggregate method of a
> windowed stream . That begs 2 questions
>
> 1. Why
> 2. Is there an alternative for stateful window aggregation where we manage
> the state. ?
>
> Thanks Vishal
>
>
> Here is the code ( generics but it works  )
>
> SingleOutputStreamOperator<OUT> retVal = input
>         .keyBy(keySelector)
>         .window(EventTimeSessionWindows.withGap(gap))
>         .aggregate(
>                 new AggregateFunction<IN, ACC, OUT>() {
>
>                     @Override
>                     public ACC createAccumulator() {
>                         ACC newInstance = (ACC) accumulator.clone();
>                         newInstance.resetLocal();
>                         return newInstance;
>                     }
>
>                     @Override
>                     public void add(IN value, ACC accumulator) {
>                         accumulator.add(value);
>
>                     }
>
>                     @Override
>                     public OUT getResult(ACC accumulator) {
>                         return accumulator.getLocalValue();
>                     }
>
>                     @Override
>                     public ACC merge(ACC a, ACC b) {
>                         a.merge(b);
>                         return a;
>                     }
>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>                     @Override
>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
>                             out.collect(input.iterator().next());
>                     }
>                 }, accType, aggregationResultType, aggregationResultType);
>
>