You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Michael Radford <mu...@gmail.com> on 2016/04/13 00:29:14 UTC

FoldFunction accumulator checkpointing

I'm wondering whether the accumulator value maintained by a
FoldFunction is automatically checkpointed?

In general, but specifically when using the WindowedStream.apply
variant that takes a FoldFunction:

public <R> DataStream<R> apply(R initialValue,
                      FoldFunction<T,R> foldFunction,
                      WindowFunction<R,R,K,W> function,
                      TypeInformation<R> evidence$7)

If not, then Flink 1.0.1 still has the issue that you can't pass a
RichFoldFunction to WindowedStream.apply
(java.lang.UnsupportedOperationException: ReduceFunction of apply can
not be a RichFunction).

But also, if not, it seems like this would be a common pattern when
doing complex (keyed / multi-valued) aggregations, and if the
accumulator type R is serializable, there could be a convenience
method for a checkpointed fold, like the mapWithState mentioned in the
State section of the streaming guide.

Thanks,
Mike

Re: Cannot compile Wikipedia Edit Stream example

Posted by Rong Rong <wa...@gmail.com>.
Hi

Can you elaborate more on how to reproduce the error? What's the maven
archetype you use to generate the job, what's the flink version you used?
what java version you used in Intellij?

I am suspecting either there's a mixed up on Scala / Java scaffold. Since
your Tuple should be
org.apache.flink.api.java.tuple.Tuple2<...>

Also, please do not directly reply to one of the ongoing email thread as
the discussion is pretty much unrelated.

Thanks,
Rong

On Sat, Sep 1, 2018 at 10:56 PM Mar_zieh <m....@gmail.com>
wrote:

> Hello
>
> I wrote example of "Monitoring the Wikipedia Edit Stream" in Intellij Idea,
> but I got errors on "acc.f0 , acc.f1" which was like this:
>
> Error:(39, 28) java: cannot find symbol
>   symbol:   variable f0
>   location: variable acc of type
> scala.Tuple2<java.lang.String,java.lang.Long>
>
> Would you please tell me how to solve this problem?
>
> Thank you in advance.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: FoldFunction accumulator checkpointing

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Marzieh,

Have you ever imported `org.apache.flink.api.java.tuple.Tuple2`? It seems
that you imported the wrong class. You can copy the code from here[1].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html


On Sun, Sep 2, 2018 at 1:56 PM Mar_zieh <m....@gmail.com> wrote:

> Hello
>
> I wrote example of "Monitoring the Wikipedia Edit Stream" in Intellij Idea,
> but I got errors on "acc.f0 , acc.f1" which was like this:
>
> Error:(39, 28) java: cannot find symbol
>   symbol:   variable f0
>   location: variable acc of type
> scala.Tuple2<java.lang.String,java.lang.Long>
>
> Would you please tell me how to solve this problem?
>
> Thank you in advance.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: FoldFunction accumulator checkpointing

Posted by Mar_zieh <m....@gmail.com>.
Hello

I wrote example of "Monitoring the Wikipedia Edit Stream" in Intellij Idea,
but I got errors on "acc.f0 , acc.f1" which was like this:

Error:(39, 28) java: cannot find symbol
  symbol:   variable f0
  location: variable acc of type
scala.Tuple2<java.lang.String,java.lang.Long>

Would you please tell me how to solve this problem?

Thank you in advance.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: FoldFunction accumulator checkpointing

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Ron,
I see that this leads to a bit of a hassle for you. I'm very reluctant to
allow the general RichFunction interface in functions that are used inside
state because this has quite some implications. Maybe we can add a
simplified interface just for functions that are used inside state to allow
them to open/close connections to outside systems. You don't need the whole
power of the RichFunction, such as access to the RuntimeContext, do you?

Just out of curiosity, are you using the InfiniteResponseFilterFolder in a
KeyedStream.fold() or WindowedStream.fold().

Cheers,
Aljoscha

On Tue, 19 Apr 2016 at 18:53 Ron Crocker <rc...@newrelic.com> wrote:

> Aljoscha -
>
> I want to use a RichFoldFunction to get the open() hook. I cheat and use
> this structure instead with a (non-Rich) FoldFunction:
>
> public class InfinitResponseFilterFolder implements
> FoldFunction<Tuple2<Long, Long>, String> {
>     private BackingStore backingStore;
>     @Override
>     public String fold(InfiniteResponseFilter accumulator, Tuple2<Long,
> Long> value) throws Exception {
>         if (backingStore == null) { // running the open() hook if necessary
>             initializeBackingStore();
>         }
>         if (accumulator == null) {
>             accumulator = new InfiniteResponseFilter(backingStore, value);
>         }
>         return accumulator.incrementFilter(value)
>     }
>
>     private void initializeBackingStore() {
>         // connect to database
>         backingStore = ...
>     }
> }
>
> Note that what I want to do is connect to a backing store to read the
> initial state for a fold operation. The particular operation I’m trying to
> do is a form of an infinite response filter in the form of a triple
> exponential smoother
> <https://en.wikipedia.org/wiki/Exponential_smoothing#Triple_exponential_smoothing> where
> the various coefficients for a start state are pre-calculated (and stored
> in that BackingStore).
>
> Further, I want to checkpoint the entire state (including the
> coefficients) to both Flink’s checkpointing system as well as the backing
> store. The former is handled here, the latter is handled with another
> transform in my graph.
>
> Is there a better approach?
>
> Ron
> —
>
>
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcrocker@newrelic.com
> M: +1 630 363 8835
>
> > On Apr 13, 2016, at 1:25 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> >
> > Hi,
> > there are two cases where a FoldFunction can be used in the streaming
> API: KeyedStream.fold() and WindowedStream.fold()/apply(). In both cases we
> internally use the partitioned state abstraction of Flink to keep the
> state. So yes, the accumulator value is consistently maintained and will
> survive failures.
> >
> > Right now, the accumulation function of a window cannot be a rich
> function because the underlying state primitives that the windowing system
> uses can only take plain functions because supporting rich functions there
> could have problematic implications. The most obvious one to me seems that
> users could be trying to keep state in the ReduceFunction of a
> ReducingState when given the chance to do so, which a RichFunction does.
> >
> > This is just off the top of my head but I can go into detail if you want.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 13 Apr 2016 at 00:29 Michael Radford <mu...@gmail.com> wrote:
>
> >>
>
> >> I'm wondering whether the accumulator value maintained by a
> >> FoldFunction is automatically checkpointed?
> >>
> >> In general, but specifically when using the WindowedStream.apply
> >> variant that takes a FoldFunction:
> >>
> >> public <R> DataStream<R> apply(R initialValue,
> >>                       FoldFunction<T,R> foldFunction,
> >>                       WindowFunction<R,R,K,W> function,
> >>                       TypeInformation<R> evidence$7)
> >>
> >> If not, then Flink 1.0.1 still has the issue that you can't pass a
> >> RichFoldFunction to WindowedStream.apply
> >> (java.lang.UnsupportedOperationException: ReduceFunction of apply can
> >> not be a RichFunction).
> >>
> >> But also, if not, it seems like this would be a common pattern when
> >> doing complex (keyed / multi-valued) aggregations, and if the
> >> accumulator type R is serializable, there could be a convenience
> >> method for a checkpointed fold, like the mapWithState mentioned in the
> >> State section of the streaming guide.
> >>
> >> Thanks,
> >> Mike
>

Re: FoldFunction accumulator checkpointing

Posted by Ron Crocker <rc...@newrelic.com>.
Aljoscha -

I want to use a RichFoldFunction to get the open() hook. I cheat and use
this structure instead with a (non-Rich) FoldFunction:

public class InfinitResponseFilterFolder implements
FoldFunction<Tuple2<Long, Long>, String> {
    private BackingStore backingStore;
    @Override
    public String fold(InfiniteResponseFilter accumulator, Tuple2<Long,
Long> value) throws Exception {
        if (backingStore == null) { // running the open() hook if necessary
            initializeBackingStore();
        }
        if (accumulator == null) {
            accumulator = new InfiniteResponseFilter(backingStore, value);
        }
        return accumulator.incrementFilter(value)
    }

    private void initializeBackingStore() {
        // connect to database
        backingStore = ...
    }
}

Note that what I want to do is connect to a backing store to read the
initial state for a fold operation. The particular operation I’m trying to
do is a form of an infinite response filter in the form of a triple
exponential smoother
<https://en.wikipedia.org/wiki/Exponential_smoothing#Triple_exponential_smoothing>
where
the various coefficients for a start state are pre-calculated (and stored
in that BackingStore).

Further, I want to checkpoint the entire state (including the coefficients)
to both Flink’s checkpointing system as well as the backing store. The
former is handled here, the latter is handled with another transform in my
graph.

Is there a better approach?

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcrocker@newrelic.com
M: +1 630 363 8835

> On Apr 13, 2016, at 1:25 AM, Aljoscha Krettek <al...@apache.org> wrote:

>
> Hi,
> there are two cases where a FoldFunction can be used in the streaming
API: KeyedStream.fold() and WindowedStream.fold()/apply(). In both cases we
internally use the partitioned state abstraction of Flink to keep the
state. So yes, the accumulator value is consistently maintained and will
survive failures.
>
> Right now, the accumulation function of a window cannot be a rich
function because the underlying state primitives that the windowing system
uses can only take plain functions because supporting rich functions there
could have problematic implications. The most obvious one to me seems that
users could be trying to keep state in the ReduceFunction of a
ReducingState when given the chance to do so, which a RichFunction does.
>
> This is just off the top of my head but I can go into detail if you want.
>
> Cheers,
> Aljoscha
>
> On Wed, 13 Apr 2016 at 00:29 Michael Radford <mu...@gmail.com> wrote:

>>

>> I'm wondering whether the accumulator value maintained by a
>> FoldFunction is automatically checkpointed?
>>
>> In general, but specifically when using the WindowedStream.apply
>> variant that takes a FoldFunction:
>>
>> public <R> DataStream<R> apply(R initialValue,
>>                       FoldFunction<T,R> foldFunction,
>>                       WindowFunction<R,R,K,W> function,
>>                       TypeInformation<R> evidence$7)
>>
>> If not, then Flink 1.0.1 still has the issue that you can't pass a
>> RichFoldFunction to WindowedStream.apply
>> (java.lang.UnsupportedOperationException: ReduceFunction of apply can
>> not be a RichFunction).
>>
>> But also, if not, it seems like this would be a common pattern when
>> doing complex (keyed / multi-valued) aggregations, and if the
>> accumulator type R is serializable, there could be a convenience
>> method for a checkpointed fold, like the mapWithState mentioned in the
>> State section of the streaming guide.
>>
>> Thanks,
>> Mike

Re: FoldFunction accumulator checkpointing

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
there are two cases where a FoldFunction can be used in the streaming API:
KeyedStream.fold() and WindowedStream.fold()/apply(). In both cases we
internally use the partitioned state abstraction of Flink to keep the
state. So yes, the accumulator value is consistently maintained and will
survive failures.

Right now, the accumulation function of a window cannot be a rich function
because the underlying state primitives that the windowing system uses can
only take plain functions because supporting rich functions there could
have problematic implications. The most obvious one to me seems that users
could be trying to keep state in the ReduceFunction of a ReducingState when
given the chance to do so, which a RichFunction does.

This is just off the top of my head but I can go into detail if you want.

Cheers,
Aljoscha

On Wed, 13 Apr 2016 at 00:29 Michael Radford <mu...@gmail.com> wrote:

> I'm wondering whether the accumulator value maintained by a
> FoldFunction is automatically checkpointed?
>
> In general, but specifically when using the WindowedStream.apply
> variant that takes a FoldFunction:
>
> public <R> DataStream<R> apply(R initialValue,
>                       FoldFunction<T,R> foldFunction,
>                       WindowFunction<R,R,K,W> function,
>                       TypeInformation<R> evidence$7)
>
> If not, then Flink 1.0.1 still has the issue that you can't pass a
> RichFoldFunction to WindowedStream.apply
> (java.lang.UnsupportedOperationException: ReduceFunction of apply can
> not be a RichFunction).
>
> But also, if not, it seems like this would be a common pattern when
> doing complex (keyed / multi-valued) aggregations, and if the
> accumulator type R is serializable, there could be a convenience
> method for a checkpointed fold, like the mapWithState mentioned in the
> State section of the streaming guide.
>
> Thanks,
> Mike
>