You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Gyula Fóra <gy...@apache.org> on 2015/07/24 17:38:43 UTC

Extending the streaming scala api with stateful functions

Hey,

I would like to propose a way to extend the standard Streaming Scala API
methods (map, flatmap, filter etc) with versions that take stateful
functions as lambdas. I think this would eliminate the awkwardness of
implementing RichFunctions in Scala and make statefulness more explicit:

*For example:*
def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
Option[S]))

This would be translated into RichMap and RichFlatMapFunctions that store
Option[S] as OperatorState for fault tolerance.

*Example rolling sum by key:*
val input: DataStream[Long] = ...
val sumByKey: DataStream[Long] =
    input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
         sum match {
                   case Some(s) => (next + s, Some(next + s))
                   case None => (next, Some(next))
          })

What do you think?

Gyula

Re: Extending the streaming scala api with stateful functions

Posted by Gyula Fóra <gy...@gmail.com>.
I opened a PR: https://github.com/apache/flink/pull/936
Feel free to comment :)

Gyula

Till Rohrmann <tr...@apache.org> ezt írta (időpont: 2015. júl. 24., P,
18:15):

> We have something similar for broadcast variables in FlinkML. It allows you
> to write ds.mapWithBcVariable(bcDS){ (dsElement, bcVar) => ... }.
>
> I like the idea to make the life of a Scala programmer a little bit less
> javaesque :-)
> ​
>
> On Fri, Jul 24, 2015 at 5:45 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > This is really syntactic sugar in the Scala API, rather then a system
> > feature.
> >
> > Which is good, it needs no extra runtime constructs...
> >
> > On Fri, Jul 24, 2015 at 5:43 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Yes, this might be nice. Till and I had similar ideas about using the
> > > pattern to make broadcast variables more useable in Scala, in fact. :D
> > >
> > > On Fri, 24 Jul 2015 at 17:39 Gyula Fóra <gy...@apache.org> wrote:
> > >
> > > > Hey,
> > > >
> > > > I would like to propose a way to extend the standard Streaming Scala
> > API
> > > > methods (map, flatmap, filter etc) with versions that take stateful
> > > > functions as lambdas. I think this would eliminate the awkwardness of
> > > > implementing RichFunctions in Scala and make statefulness more
> > explicit:
> > > >
> > > > *For example:*
> > > > def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
> > > > def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
> > > > Option[S]))
> > > >
> > > > This would be translated into RichMap and RichFlatMapFunctions that
> > store
> > > > Option[S] as OperatorState for fault tolerance.
> > > >
> > > > *Example rolling sum by key:*
> > > > val input: DataStream[Long] = ...
> > > > val sumByKey: DataStream[Long] =
> > > >     input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
> > > >          sum match {
> > > >                    case Some(s) => (next + s, Some(next + s))
> > > >                    case None => (next, Some(next))
> > > >           })
> > > >
> > > > What do you think?
> > > >
> > > > Gyula
> > > >
> > >
> >
>

Re: Extending the streaming scala api with stateful functions

Posted by Till Rohrmann <tr...@apache.org>.
We have something similar for broadcast variables in FlinkML. It allows you
to write ds.mapWithBcVariable(bcDS){ (dsElement, bcVar) => ... }.

I like the idea to make the life of a Scala programmer a little bit less
javaesque :-)
​

On Fri, Jul 24, 2015 at 5:45 PM, Stephan Ewen <se...@apache.org> wrote:

> This is really syntactic sugar in the Scala API, rather then a system
> feature.
>
> Which is good, it needs no extra runtime constructs...
>
> On Fri, Jul 24, 2015 at 5:43 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Yes, this might be nice. Till and I had similar ideas about using the
> > pattern to make broadcast variables more useable in Scala, in fact. :D
> >
> > On Fri, 24 Jul 2015 at 17:39 Gyula Fóra <gy...@apache.org> wrote:
> >
> > > Hey,
> > >
> > > I would like to propose a way to extend the standard Streaming Scala
> API
> > > methods (map, flatmap, filter etc) with versions that take stateful
> > > functions as lambdas. I think this would eliminate the awkwardness of
> > > implementing RichFunctions in Scala and make statefulness more
> explicit:
> > >
> > > *For example:*
> > > def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
> > > def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
> > > Option[S]))
> > >
> > > This would be translated into RichMap and RichFlatMapFunctions that
> store
> > > Option[S] as OperatorState for fault tolerance.
> > >
> > > *Example rolling sum by key:*
> > > val input: DataStream[Long] = ...
> > > val sumByKey: DataStream[Long] =
> > >     input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
> > >          sum match {
> > >                    case Some(s) => (next + s, Some(next + s))
> > >                    case None => (next, Some(next))
> > >           })
> > >
> > > What do you think?
> > >
> > > Gyula
> > >
> >
>

Re: Extending the streaming scala api with stateful functions

Posted by Stephan Ewen <se...@apache.org>.
This is really syntactic sugar in the Scala API, rather then a system
feature.

Which is good, it needs no extra runtime constructs...

On Fri, Jul 24, 2015 at 5:43 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Yes, this might be nice. Till and I had similar ideas about using the
> pattern to make broadcast variables more useable in Scala, in fact. :D
>
> On Fri, 24 Jul 2015 at 17:39 Gyula Fóra <gy...@apache.org> wrote:
>
> > Hey,
> >
> > I would like to propose a way to extend the standard Streaming Scala API
> > methods (map, flatmap, filter etc) with versions that take stateful
> > functions as lambdas. I think this would eliminate the awkwardness of
> > implementing RichFunctions in Scala and make statefulness more explicit:
> >
> > *For example:*
> > def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
> > def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
> > Option[S]))
> >
> > This would be translated into RichMap and RichFlatMapFunctions that store
> > Option[S] as OperatorState for fault tolerance.
> >
> > *Example rolling sum by key:*
> > val input: DataStream[Long] = ...
> > val sumByKey: DataStream[Long] =
> >     input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
> >          sum match {
> >                    case Some(s) => (next + s, Some(next + s))
> >                    case None => (next, Some(next))
> >           })
> >
> > What do you think?
> >
> > Gyula
> >
>

Re: Extending the streaming scala api with stateful functions

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, this might be nice. Till and I had similar ideas about using the
pattern to make broadcast variables more useable in Scala, in fact. :D

On Fri, 24 Jul 2015 at 17:39 Gyula Fóra <gy...@apache.org> wrote:

> Hey,
>
> I would like to propose a way to extend the standard Streaming Scala API
> methods (map, flatmap, filter etc) with versions that take stateful
> functions as lambdas. I think this would eliminate the awkwardness of
> implementing RichFunctions in Scala and make statefulness more explicit:
>
> *For example:*
> def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
> def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
> Option[S]))
>
> This would be translated into RichMap and RichFlatMapFunctions that store
> Option[S] as OperatorState for fault tolerance.
>
> *Example rolling sum by key:*
> val input: DataStream[Long] = ...
> val sumByKey: DataStream[Long] =
>     input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
>          sum match {
>                    case Some(s) => (next + s, Some(next + s))
>                    case None => (next, Some(next))
>           })
>
> What do you think?
>
> Gyula
>