You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stefano Baghino <st...@radicalbit.io> on 2016/01/28 14:05:19 UTC

Case style anonymous functions not supported by Scala API

Hello everybody,

as I'm getting familiar with Flink I've found a possible improvement to the
Scala APIs: in Scala it's a common pattern to perform tuple extraction
using pattern matching, making functions working on tuples more readable,
like this:

// referring to the mail count example in the training
// assuming `mails` is a DataSet[(String, String)]
// a pair of date and a string with username and email
val monthsAndEmails =
  mails.map {
    case (date, sender) =>
      (extractMonth(date), extractEmail(sender))
  }

However, this is not possible when using the Scala APIs because of the
overloading of the `map` function in the `DataSet` and `DataStream` classes
(along with other higher-order function such as `flatMap` and `filter`). My
understanding is that the main reason to have two different overloaded
functions is to provide support for `RichFunction`s.
I've found out there has been some interest around the issue in the past (
[FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
In the past couple of days me and my colleague Andrea have tried several
ways to address the problem, coming to two possible solutions:

   1. don't overload and use different names, e.g. `map` taking a Scala
   function and `mapWith` taking a Flink MapFunction
   2. keep only the method taking a Scala function (which would be more
   idiomatic from a Scala perspective, IMHO) and providing an implicit
   conversion from the Flink function to the Scala function within the
   `org.apache.flink.api.scala` package object

We've also evaluated several other approaches using union types and type
classes but we've found them to be too complex. Regarding the two
approaches I've cited, the first would imply a breaking change to the APIs,
while the second is giving me a hard time at figuring out some compilation
errors in `flink-libraries` and `flink-contrib` and as we tested it we
found out `RichMapFunction`s lose state (possibly because of the double
conversion, first to a Scala function, then to a simple `MapFunction`).

You can have a look at the code I've written so far here (last 2 commits):
https://github.com/radicalbit/flink/commits/1159

We had a little exchange of ideas and thought that the first solution would
be easier and also interesting from the standpoint of the ergonomics of the
API (e.g. `line mapWith new LineSplitter`) and would like to gather some
feedback on the feasibility of this change.

Would this still be regarded as a relevant improvement? What do you think
about it? Do you think there's time to work on them before the 1.0 release?
What do you think about introducing breaking changes to make this pattern
available to Scala users?

Thank you all in advance for your feedback.

-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Stephan Ewen <se...@apache.org>.
I think that if we make the class part of the default ".api.scala" package
object, we effectively add these methods to DataSet.scala, because they
will be always be available on the data set.

If we want to retain the liberty to not commit to this change now, then we
should probably ask users to explicitly decide to use this extension (via a
separate import).

On Tue, Feb 9, 2016 at 10:53 PM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> Hi Till,
>
> I do agree with your point, so much so so that at the time being I'd
> suggest to keeping these additions as optional, up to the end-user to
> opt-in.
> Adding them by default would effectively be an addition to the DataSet API
> (despite being separated at a source file level).
> I think your solution works best right now. We can proceed on this path
> unless we see more interest around support for case-style functions.
>
> On Tue, Feb 9, 2016 at 7:31 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> > What we could do is to add the implicit class to the package object of
> > org.apache.flink.api.scala. Since one always has to import this package
> in
> > order to have the proper TypeInformations, you wouldn’t have to import
> the
> > extension explicitly.
> >
> > The thing with the API is that you don’t want to break it too often. Once
> > people are used to it and have code implemented it always entails
> rewriting
> > the code if you change the API in a breaking manner. This can be really
> > annoying for users.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Tue, Feb 9, 2016 at 2:51 PM, Stefano Baghino <
> > stefano.baghino@radicalbit.io> wrote:
> >
> > > I agree with you, but I acknowledge that there may be concerns
> regarding
> > > the stability of the API. Perhaps the rationale behind the proposal of
> > > Stephan and Till is to provide it as an extension to test how the
> > > developers feel about it. It would be ideal to have a larger feedback
> > from
> > > the community. However I have to admit I like the approach.
> > >
> > > On Tue, Feb 9, 2016 at 2:43 PM, Theodore Vasiloudis <
> > > theodoros.vasiloudis@gmail.com> wrote:
> > >
> > > > Thanks for bringing this up Stefano, it would a very welcome addition
> > > > indeed.
> > > >
> > > > I like the approach of having extensions through implicits as well.
> > IMHO
> > > > though this should be the default
> > > > behavior, without the need to add another import.
> > > >
> > > > On Tue, Feb 9, 2016 at 1:29 PM, Stefano Baghino <
> > > > stefano.baghino@radicalbit.io> wrote:
> > > >
> > > > > I see, thanks for the tip! I'll work on it; meanwhile, I've added
> > some
> > > > > functions and Scaladoc:
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/blob/1159-implicit/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
> > > > >
> > > > > On Tue, Feb 9, 2016 at 12:01 PM, Till Rohrmann <
> trohrmann@apache.org
> > >
> > > > > wrote:
> > > > >
> > > > > > Not the DataSet but the JoinDataSet and the CoGroupDataSet do in
> > the
> > > > form
> > > > > > of an apply function.
> > > > > > ​
> > > > > >
> > > > > > On Tue, Feb 9, 2016 at 11:09 AM, Stefano Baghino <
> > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > >
> > > > > > > Sure, it was just a draft. I agree that filter and mapPartition
> > > make
> > > > > > sense,
> > > > > > > but coGroup and join don't look like they take a function.
> > > > > > >
> > > > > > > On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <
> > > trohrmann@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > This looks like a good design to me :-) The only thing is
> that
> > it
> > > > is
> > > > > > not
> > > > > > > > complete. For example, the filter, mapPartition, coGroup and
> > join
> > > > > > > functions
> > > > > > > > are missing.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Till
> > > > > > > > ​
> > > > > > > >
> > > > > > > > On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > >
> > > > > > > > > What do you think of something like this?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> > > > > > > > >
> > > > > > > > > In this way, several extensions can be collected in this
> > > package
> > > > > > object
> > > > > > > > and
> > > > > > > > > picked altogether or a-là-carte (e.g. import
> > > > > > > > >
> > org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> > > > > > > > >
> > > > > > > > > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <
> > > > > trohrmann@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I like the idea to support partial functions with Flink’s
> > > Scala
> > > > > > API.
> > > > > > > > > > However, I think that breaking the API and making it
> > > > inconsistent
> > > > > > > with
> > > > > > > > > > respect to the Java API is not the best option. I would
> > > rather
> > > > be
> > > > > > in
> > > > > > > > > favour
> > > > > > > > > > of the first proposal where we add a new method xxxWith
> via
> > > > > > implicit
> > > > > > > > > > conversions.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Till
> > > > > > > > > > ​
> > > > > > > > > >
> > > > > > > > > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > > > >
> > > > > > > > > > > It took me a little time but I was able to put together
> > > some
> > > > > > code.
> > > > > > > > > > >
> > > > > > > > > > > In this commit I just added a few methods renamed to
> > > prevent
> > > > > > > > > overloading,
> > > > > > > > > > > thus usable with PartialFunction instead of functions:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > > > > > > > > >
> > > > > > > > > > > In this other commit I coded the original proposal,
> > > renaming
> > > > > the
> > > > > > > > > methods
> > > > > > > > > > to
> > > > > > > > > > > obtain the same effect as before, but with lower
> friction
> > > for
> > > > > > Scala
> > > > > > > > > > > developers (and provided some usage examples):
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Stephan,
> > > > > > > > > > > >
> > > > > > > > > > > > thank you for the quick reply and for your feedback;
> I
> > > > agree
> > > > > > with
> > > > > > > > you
> > > > > > > > > > > that
> > > > > > > > > > > > breaking changes have to taken very seriously.
> > > > > > > > > > > >
> > > > > > > > > > > > The rationale behind my proposal is that Scala users
> > are
> > > > > > already
> > > > > > > > > > > > accustomed to higher-order functions that manipulate
> > > > > > collections
> > > > > > > > and
> > > > > > > > > it
> > > > > > > > > > > > would beneficial for them to have an API that tries
> to
> > > > adhere
> > > > > > as
> > > > > > > > much
> > > > > > > > > > as
> > > > > > > > > > > > possible to the interface provided by the Scala
> > > Collections
> > > > > > API.
> > > > > > > > IMHO
> > > > > > > > > > > being
> > > > > > > > > > > > able to manipulate a DataSet or DataStream like a
> Scala
> > > > > > > collection
> > > > > > > > > > > > idiomatically would appeal to developers and reduce
> the
> > > > > > friction
> > > > > > > > for
> > > > > > > > > > them
> > > > > > > > > > > > to learn Flink.
> > > > > > > > > > > >
> > > > > > > > > > > > If we want to pursue the renaming path, I think these
> > > > changes
> > > > > > > (and
> > > > > > > > > > > porting
> > > > > > > > > > > > the rest of the codebase, like `flink-ml` and
> > > > > `flink-contrib`,
> > > > > > to
> > > > > > > > the
> > > > > > > > > > new
> > > > > > > > > > > > method names) can be done in relatively little time.
> > > Since
> > > > > > Flink
> > > > > > > is
> > > > > > > > > > > > approaching a major release, I think it's a good time
> > to
> > > > > > consider
> > > > > > > > > this
> > > > > > > > > > > > change, if the community deems it relevant.
> > > > > > > > > > > >
> > > > > > > > > > > > While we await for feedback on the proposal, I can
> > start
> > > > > > working
> > > > > > > on
> > > > > > > > > > both
> > > > > > > > > > > > paths to see how it would affect the codebase, what
> do
> > > you
> > > > > > think?
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <
> > > > > > sewen@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hi!
> > > > > > > > > > > >>
> > > > > > > > > > > >> Would be nice to support that, agreed.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Such a fundamental break in the API worries me a
> bit,
> > > > though
> > > > > > - I
> > > > > > > > > would
> > > > > > > > > > > opt
> > > > > > > > > > > >> for a non-breaking addition.
> > > > > > > > > > > >> Wrapping the RichFunctions into Scala functions
> (which
> > > are
> > > > > > > > actually
> > > > > > > > > > > >> wrapped
> > > > > > > > > > > >> as rich functions) with implicits seems like a
> > > workaround
> > > > > for
> > > > > > > > > > something
> > > > > > > > > > > >> that should be very simple. Would probably also
> cost a
> > > bit
> > > > > of
> > > > > > > > > > > performance.
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> I like the idea of "mapWith(...)" - if that were a
> > > simple
> > > > > non
> > > > > > > > > > overloaded
> > > > > > > > > > > >> function accepting a Scala function, it should
> accept
> > > > > > case-style
> > > > > > > > > > > >> functions,
> > > > > > > > > > > >> right?
> > > > > > > > > > > >> Simply adding that would probably solve things, but
> > add
> > > a
> > > > > > second
> > > > > > > > > > variant
> > > > > > > > > > > >> of
> > > > > > > > > > > >> each function to the DataSet. An implicit conversion
> > > from
> > > > > > > DataSet
> > > > > > > > to
> > > > > > > > > > > >> DataSetExtended (which implements the mapWith,
> > > reduceWith,
> > > > > > ...)
> > > > > > > > > > methods
> > > > > > > > > > > >> could help there...
> > > > > > > > > > > >>
> > > > > > > > > > > >> What do you think?
> > > > > > > > > > > >>
> > > > > > > > > > > >> Greetings,
> > > > > > > > > > > >> Stephan
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > > > > > > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Hello everybody,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > as I'm getting familiar with Flink I've found a
> > > possible
> > > > > > > > > improvement
> > > > > > > > > > > to
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > Scala APIs: in Scala it's a common pattern to
> > perform
> > > > > tuple
> > > > > > > > > > extraction
> > > > > > > > > > > >> > using pattern matching, making functions working
> on
> > > > tuples
> > > > > > > more
> > > > > > > > > > > >> readable,
> > > > > > > > > > > >> > like this:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > // referring to the mail count example in the
> > training
> > > > > > > > > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > > > > > > > > >> > // a pair of date and a string with username and
> > email
> > > > > > > > > > > >> > val monthsAndEmails =
> > > > > > > > > > > >> >   mails.map {
> > > > > > > > > > > >> >     case (date, sender) =>
> > > > > > > > > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > > > > > > > > >> >   }
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > However, this is not possible when using the Scala
> > > APIs
> > > > > > > because
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > >> > overloading of the `map` function in the `DataSet`
> > and
> > > > > > > > > `DataStream`
> > > > > > > > > > > >> classes
> > > > > > > > > > > >> > (along with other higher-order function such as
> > > > `flatMap`
> > > > > > and
> > > > > > > > > > > >> `filter`). My
> > > > > > > > > > > >> > understanding is that the main reason to have two
> > > > > different
> > > > > > > > > > overloaded
> > > > > > > > > > > >> > functions is to provide support for
> `RichFunction`s.
> > > > > > > > > > > >> > I've found out there has been some interest around
> > the
> > > > > issue
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > >> past (
> > > > > > > > > > > >> > [FLINK-1159] <
> > > > > > > https://issues.apache.org/jira/browse/FLINK-1159
> > > > > > > > >).
> > > > > > > > > > > >> > In the past couple of days me and my colleague
> > Andrea
> > > > have
> > > > > > > tried
> > > > > > > > > > > several
> > > > > > > > > > > >> > ways to address the problem, coming to two
> possible
> > > > > > solutions:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    1. don't overload and use different names, e.g.
> > > `map`
> > > > > > > taking
> > > > > > > > a
> > > > > > > > > > > Scala
> > > > > > > > > > > >> >    function and `mapWith` taking a Flink
> MapFunction
> > > > > > > > > > > >> >    2. keep only the method taking a Scala function
> > > > (which
> > > > > > > would
> > > > > > > > be
> > > > > > > > > > > more
> > > > > > > > > > > >> >    idiomatic from a Scala perspective, IMHO) and
> > > > providing
> > > > > > an
> > > > > > > > > > implicit
> > > > > > > > > > > >> >    conversion from the Flink function to the Scala
> > > > > function
> > > > > > > > within
> > > > > > > > > > the
> > > > > > > > > > > >> >    `org.apache.flink.api.scala` package object
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > We've also evaluated several other approaches
> using
> > > > union
> > > > > > > types
> > > > > > > > > and
> > > > > > > > > > > type
> > > > > > > > > > > >> > classes but we've found them to be too complex.
> > > > Regarding
> > > > > > the
> > > > > > > > two
> > > > > > > > > > > >> > approaches I've cited, the first would imply a
> > > breaking
> > > > > > change
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > >> APIs,
> > > > > > > > > > > >> > while the second is giving me a hard time at
> > figuring
> > > > out
> > > > > > some
> > > > > > > > > > > >> compilation
> > > > > > > > > > > >> > errors in `flink-libraries` and `flink-contrib`
> and
> > as
> > > > we
> > > > > > > tested
> > > > > > > > > it
> > > > > > > > > > we
> > > > > > > > > > > >> > found out `RichMapFunction`s lose state (possibly
> > > > because
> > > > > of
> > > > > > > the
> > > > > > > > > > > double
> > > > > > > > > > > >> > conversion, first to a Scala function, then to a
> > > simple
> > > > > > > > > > > `MapFunction`).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > You can have a look at the code I've written so
> far
> > > here
> > > > > > > (last 2
> > > > > > > > > > > >> commits):
> > > > > > > > > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > We had a little exchange of ideas and thought that
> > the
> > > > > first
> > > > > > > > > > solution
> > > > > > > > > > > >> would
> > > > > > > > > > > >> > be easier and also interesting from the standpoint
> > of
> > > > the
> > > > > > > > > ergonomics
> > > > > > > > > > > of
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > API (e.g. `line mapWith new LineSplitter`) and
> would
> > > > like
> > > > > to
> > > > > > > > > gather
> > > > > > > > > > > some
> > > > > > > > > > > >> > feedback on the feasibility of this change.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Would this still be regarded as a relevant
> > > improvement?
> > > > > What
> > > > > > > do
> > > > > > > > > you
> > > > > > > > > > > >> think
> > > > > > > > > > > >> > about it? Do you think there's time to work on
> them
> > > > before
> > > > > > the
> > > > > > > > 1.0
> > > > > > > > > > > >> release?
> > > > > > > > > > > >> > What do you think about introducing breaking
> changes
> > > to
> > > > > make
> > > > > > > > this
> > > > > > > > > > > >> pattern
> > > > > > > > > > > >> > available to Scala users?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thank you all in advance for your feedback.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> > BR,
> > > > > > > > > > > >> > Stefano Baghino
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Software Engineer @ Radicalbit
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > BR,
> > > > > > > > > > > > Stefano Baghino
> > > > > > > > > > > >
> > > > > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > BR,
> > > > > > > > > > > Stefano Baghino
> > > > > > > > > > >
> > > > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > BR,
> > > > > > > > > Stefano Baghino
> > > > > > > > >
> > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > BR,
> > > > > > > Stefano Baghino
> > > > > > >
> > > > > > > Software Engineer @ Radicalbit
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > BR,
> > > > > Stefano Baghino
> > > > >
> > > > > Software Engineer @ Radicalbit
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > BR,
> > > Stefano Baghino
> > >
> > > Software Engineer @ Radicalbit
> > >
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Re: Case style anonymous functions not supported by Scala API

Posted by Stefano Baghino <st...@radicalbit.io>.
Hi Till,

I do agree with your point, so much so so that at the time being I'd
suggest to keeping these additions as optional, up to the end-user to
opt-in.
Adding them by default would effectively be an addition to the DataSet API
(despite being separated at a source file level).
I think your solution works best right now. We can proceed on this path
unless we see more interest around support for case-style functions.

On Tue, Feb 9, 2016 at 7:31 PM, Till Rohrmann <tr...@apache.org> wrote:

> What we could do is to add the implicit class to the package object of
> org.apache.flink.api.scala. Since one always has to import this package in
> order to have the proper TypeInformations, you wouldn’t have to import the
> extension explicitly.
>
> The thing with the API is that you don’t want to break it too often. Once
> people are used to it and have code implemented it always entails rewriting
> the code if you change the API in a breaking manner. This can be really
> annoying for users.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 2:51 PM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
> > I agree with you, but I acknowledge that there may be concerns regarding
> > the stability of the API. Perhaps the rationale behind the proposal of
> > Stephan and Till is to provide it as an extension to test how the
> > developers feel about it. It would be ideal to have a larger feedback
> from
> > the community. However I have to admit I like the approach.
> >
> > On Tue, Feb 9, 2016 at 2:43 PM, Theodore Vasiloudis <
> > theodoros.vasiloudis@gmail.com> wrote:
> >
> > > Thanks for bringing this up Stefano, it would a very welcome addition
> > > indeed.
> > >
> > > I like the approach of having extensions through implicits as well.
> IMHO
> > > though this should be the default
> > > behavior, without the need to add another import.
> > >
> > > On Tue, Feb 9, 2016 at 1:29 PM, Stefano Baghino <
> > > stefano.baghino@radicalbit.io> wrote:
> > >
> > > > I see, thanks for the tip! I'll work on it; meanwhile, I've added
> some
> > > > functions and Scaladoc:
> > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/blob/1159-implicit/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
> > > >
> > > > On Tue, Feb 9, 2016 at 12:01 PM, Till Rohrmann <trohrmann@apache.org
> >
> > > > wrote:
> > > >
> > > > > Not the DataSet but the JoinDataSet and the CoGroupDataSet do in
> the
> > > form
> > > > > of an apply function.
> > > > > ​
> > > > >
> > > > > On Tue, Feb 9, 2016 at 11:09 AM, Stefano Baghino <
> > > > > stefano.baghino@radicalbit.io> wrote:
> > > > >
> > > > > > Sure, it was just a draft. I agree that filter and mapPartition
> > make
> > > > > sense,
> > > > > > but coGroup and join don't look like they take a function.
> > > > > >
> > > > > > On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <
> > trohrmann@apache.org
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > This looks like a good design to me :-) The only thing is that
> it
> > > is
> > > > > not
> > > > > > > complete. For example, the filter, mapPartition, coGroup and
> join
> > > > > > functions
> > > > > > > are missing.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > > ​
> > > > > > >
> > > > > > > On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > >
> > > > > > > > What do you think of something like this?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> > > > > > > >
> > > > > > > > In this way, several extensions can be collected in this
> > package
> > > > > object
> > > > > > > and
> > > > > > > > picked altogether or a-là-carte (e.g. import
> > > > > > > >
> org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> > > > > > > >
> > > > > > > > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <
> > > > trohrmann@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > I like the idea to support partial functions with Flink’s
> > Scala
> > > > > API.
> > > > > > > > > However, I think that breaking the API and making it
> > > inconsistent
> > > > > > with
> > > > > > > > > respect to the Java API is not the best option. I would
> > rather
> > > be
> > > > > in
> > > > > > > > favour
> > > > > > > > > of the first proposal where we add a new method xxxWith via
> > > > > implicit
> > > > > > > > > conversions.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Till
> > > > > > > > > ​
> > > > > > > > >
> > > > > > > > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > > >
> > > > > > > > > > It took me a little time but I was able to put together
> > some
> > > > > code.
> > > > > > > > > >
> > > > > > > > > > In this commit I just added a few methods renamed to
> > prevent
> > > > > > > > overloading,
> > > > > > > > > > thus usable with PartialFunction instead of functions:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > > > > > > > >
> > > > > > > > > > In this other commit I coded the original proposal,
> > renaming
> > > > the
> > > > > > > > methods
> > > > > > > > > to
> > > > > > > > > > obtain the same effect as before, but with lower friction
> > for
> > > > > Scala
> > > > > > > > > > developers (and provided some usage examples):
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > > > > > > > >
> > > > > > > > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Stephan,
> > > > > > > > > > >
> > > > > > > > > > > thank you for the quick reply and for your feedback; I
> > > agree
> > > > > with
> > > > > > > you
> > > > > > > > > > that
> > > > > > > > > > > breaking changes have to taken very seriously.
> > > > > > > > > > >
> > > > > > > > > > > The rationale behind my proposal is that Scala users
> are
> > > > > already
> > > > > > > > > > > accustomed to higher-order functions that manipulate
> > > > > collections
> > > > > > > and
> > > > > > > > it
> > > > > > > > > > > would beneficial for them to have an API that tries to
> > > adhere
> > > > > as
> > > > > > > much
> > > > > > > > > as
> > > > > > > > > > > possible to the interface provided by the Scala
> > Collections
> > > > > API.
> > > > > > > IMHO
> > > > > > > > > > being
> > > > > > > > > > > able to manipulate a DataSet or DataStream like a Scala
> > > > > > collection
> > > > > > > > > > > idiomatically would appeal to developers and reduce the
> > > > > friction
> > > > > > > for
> > > > > > > > > them
> > > > > > > > > > > to learn Flink.
> > > > > > > > > > >
> > > > > > > > > > > If we want to pursue the renaming path, I think these
> > > changes
> > > > > > (and
> > > > > > > > > > porting
> > > > > > > > > > > the rest of the codebase, like `flink-ml` and
> > > > `flink-contrib`,
> > > > > to
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > method names) can be done in relatively little time.
> > Since
> > > > > Flink
> > > > > > is
> > > > > > > > > > > approaching a major release, I think it's a good time
> to
> > > > > consider
> > > > > > > > this
> > > > > > > > > > > change, if the community deems it relevant.
> > > > > > > > > > >
> > > > > > > > > > > While we await for feedback on the proposal, I can
> start
> > > > > working
> > > > > > on
> > > > > > > > > both
> > > > > > > > > > > paths to see how it would affect the codebase, what do
> > you
> > > > > think?
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <
> > > > > sewen@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hi!
> > > > > > > > > > >>
> > > > > > > > > > >> Would be nice to support that, agreed.
> > > > > > > > > > >>
> > > > > > > > > > >> Such a fundamental break in the API worries me a bit,
> > > though
> > > > > - I
> > > > > > > > would
> > > > > > > > > > opt
> > > > > > > > > > >> for a non-breaking addition.
> > > > > > > > > > >> Wrapping the RichFunctions into Scala functions (which
> > are
> > > > > > > actually
> > > > > > > > > > >> wrapped
> > > > > > > > > > >> as rich functions) with implicits seems like a
> > workaround
> > > > for
> > > > > > > > > something
> > > > > > > > > > >> that should be very simple. Would probably also cost a
> > bit
> > > > of
> > > > > > > > > > performance.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> I like the idea of "mapWith(...)" - if that were a
> > simple
> > > > non
> > > > > > > > > overloaded
> > > > > > > > > > >> function accepting a Scala function, it should accept
> > > > > case-style
> > > > > > > > > > >> functions,
> > > > > > > > > > >> right?
> > > > > > > > > > >> Simply adding that would probably solve things, but
> add
> > a
> > > > > second
> > > > > > > > > variant
> > > > > > > > > > >> of
> > > > > > > > > > >> each function to the DataSet. An implicit conversion
> > from
> > > > > > DataSet
> > > > > > > to
> > > > > > > > > > >> DataSetExtended (which implements the mapWith,
> > reduceWith,
> > > > > ...)
> > > > > > > > > methods
> > > > > > > > > > >> could help there...
> > > > > > > > > > >>
> > > > > > > > > > >> What do you think?
> > > > > > > > > > >>
> > > > > > > > > > >> Greetings,
> > > > > > > > > > >> Stephan
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > > > > > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > Hello everybody,
> > > > > > > > > > >> >
> > > > > > > > > > >> > as I'm getting familiar with Flink I've found a
> > possible
> > > > > > > > improvement
> > > > > > > > > > to
> > > > > > > > > > >> the
> > > > > > > > > > >> > Scala APIs: in Scala it's a common pattern to
> perform
> > > > tuple
> > > > > > > > > extraction
> > > > > > > > > > >> > using pattern matching, making functions working on
> > > tuples
> > > > > > more
> > > > > > > > > > >> readable,
> > > > > > > > > > >> > like this:
> > > > > > > > > > >> >
> > > > > > > > > > >> > // referring to the mail count example in the
> training
> > > > > > > > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > > > > > > > >> > // a pair of date and a string with username and
> email
> > > > > > > > > > >> > val monthsAndEmails =
> > > > > > > > > > >> >   mails.map {
> > > > > > > > > > >> >     case (date, sender) =>
> > > > > > > > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > > > > > > > >> >   }
> > > > > > > > > > >> >
> > > > > > > > > > >> > However, this is not possible when using the Scala
> > APIs
> > > > > > because
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > >> > overloading of the `map` function in the `DataSet`
> and
> > > > > > > > `DataStream`
> > > > > > > > > > >> classes
> > > > > > > > > > >> > (along with other higher-order function such as
> > > `flatMap`
> > > > > and
> > > > > > > > > > >> `filter`). My
> > > > > > > > > > >> > understanding is that the main reason to have two
> > > > different
> > > > > > > > > overloaded
> > > > > > > > > > >> > functions is to provide support for `RichFunction`s.
> > > > > > > > > > >> > I've found out there has been some interest around
> the
> > > > issue
> > > > > > in
> > > > > > > > the
> > > > > > > > > > >> past (
> > > > > > > > > > >> > [FLINK-1159] <
> > > > > > https://issues.apache.org/jira/browse/FLINK-1159
> > > > > > > >).
> > > > > > > > > > >> > In the past couple of days me and my colleague
> Andrea
> > > have
> > > > > > tried
> > > > > > > > > > several
> > > > > > > > > > >> > ways to address the problem, coming to two possible
> > > > > solutions:
> > > > > > > > > > >> >
> > > > > > > > > > >> >    1. don't overload and use different names, e.g.
> > `map`
> > > > > > taking
> > > > > > > a
> > > > > > > > > > Scala
> > > > > > > > > > >> >    function and `mapWith` taking a Flink MapFunction
> > > > > > > > > > >> >    2. keep only the method taking a Scala function
> > > (which
> > > > > > would
> > > > > > > be
> > > > > > > > > > more
> > > > > > > > > > >> >    idiomatic from a Scala perspective, IMHO) and
> > > providing
> > > > > an
> > > > > > > > > implicit
> > > > > > > > > > >> >    conversion from the Flink function to the Scala
> > > > function
> > > > > > > within
> > > > > > > > > the
> > > > > > > > > > >> >    `org.apache.flink.api.scala` package object
> > > > > > > > > > >> >
> > > > > > > > > > >> > We've also evaluated several other approaches using
> > > union
> > > > > > types
> > > > > > > > and
> > > > > > > > > > type
> > > > > > > > > > >> > classes but we've found them to be too complex.
> > > Regarding
> > > > > the
> > > > > > > two
> > > > > > > > > > >> > approaches I've cited, the first would imply a
> > breaking
> > > > > change
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > >> APIs,
> > > > > > > > > > >> > while the second is giving me a hard time at
> figuring
> > > out
> > > > > some
> > > > > > > > > > >> compilation
> > > > > > > > > > >> > errors in `flink-libraries` and `flink-contrib` and
> as
> > > we
> > > > > > tested
> > > > > > > > it
> > > > > > > > > we
> > > > > > > > > > >> > found out `RichMapFunction`s lose state (possibly
> > > because
> > > > of
> > > > > > the
> > > > > > > > > > double
> > > > > > > > > > >> > conversion, first to a Scala function, then to a
> > simple
> > > > > > > > > > `MapFunction`).
> > > > > > > > > > >> >
> > > > > > > > > > >> > You can have a look at the code I've written so far
> > here
> > > > > > (last 2
> > > > > > > > > > >> commits):
> > > > > > > > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > > > > > > > >> >
> > > > > > > > > > >> > We had a little exchange of ideas and thought that
> the
> > > > first
> > > > > > > > > solution
> > > > > > > > > > >> would
> > > > > > > > > > >> > be easier and also interesting from the standpoint
> of
> > > the
> > > > > > > > ergonomics
> > > > > > > > > > of
> > > > > > > > > > >> the
> > > > > > > > > > >> > API (e.g. `line mapWith new LineSplitter`) and would
> > > like
> > > > to
> > > > > > > > gather
> > > > > > > > > > some
> > > > > > > > > > >> > feedback on the feasibility of this change.
> > > > > > > > > > >> >
> > > > > > > > > > >> > Would this still be regarded as a relevant
> > improvement?
> > > > What
> > > > > > do
> > > > > > > > you
> > > > > > > > > > >> think
> > > > > > > > > > >> > about it? Do you think there's time to work on them
> > > before
> > > > > the
> > > > > > > 1.0
> > > > > > > > > > >> release?
> > > > > > > > > > >> > What do you think about introducing breaking changes
> > to
> > > > make
> > > > > > > this
> > > > > > > > > > >> pattern
> > > > > > > > > > >> > available to Scala users?
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thank you all in advance for your feedback.
> > > > > > > > > > >> >
> > > > > > > > > > >> > --
> > > > > > > > > > >> > BR,
> > > > > > > > > > >> > Stefano Baghino
> > > > > > > > > > >> >
> > > > > > > > > > >> > Software Engineer @ Radicalbit
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > BR,
> > > > > > > > > > > Stefano Baghino
> > > > > > > > > > >
> > > > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > BR,
> > > > > > > > > > Stefano Baghino
> > > > > > > > > >
> > > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > BR,
> > > > > > > > Stefano Baghino
> > > > > > > >
> > > > > > > > Software Engineer @ Radicalbit
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > BR,
> > > > > > Stefano Baghino
> > > > > >
> > > > > > Software Engineer @ Radicalbit
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > BR,
> > > > Stefano Baghino
> > > >
> > > > Software Engineer @ Radicalbit
> > > >
> > >
> >
> >
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
> >
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Till Rohrmann <tr...@apache.org>.
What we could do is to add the implicit class to the package object of
org.apache.flink.api.scala. Since one always has to import this package in
order to have the proper TypeInformations, you wouldn’t have to import the
extension explicitly.

The thing with the API is that you don’t want to break it too often. Once
people are used to it and have code implemented it always entails rewriting
the code if you change the API in a breaking manner. This can be really
annoying for users.

Cheers,
Till
​

On Tue, Feb 9, 2016 at 2:51 PM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> I agree with you, but I acknowledge that there may be concerns regarding
> the stability of the API. Perhaps the rationale behind the proposal of
> Stephan and Till is to provide it as an extension to test how the
> developers feel about it. It would be ideal to have a larger feedback from
> the community. However I have to admit I like the approach.
>
> On Tue, Feb 9, 2016 at 2:43 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
> > Thanks for bringing this up Stefano, it would a very welcome addition
> > indeed.
> >
> > I like the approach of having extensions through implicits as well. IMHO
> > though this should be the default
> > behavior, without the need to add another import.
> >
> > On Tue, Feb 9, 2016 at 1:29 PM, Stefano Baghino <
> > stefano.baghino@radicalbit.io> wrote:
> >
> > > I see, thanks for the tip! I'll work on it; meanwhile, I've added some
> > > functions and Scaladoc:
> > >
> > >
> >
> https://github.com/radicalbit/flink/blob/1159-implicit/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
> > >
> > > On Tue, Feb 9, 2016 at 12:01 PM, Till Rohrmann <tr...@apache.org>
> > > wrote:
> > >
> > > > Not the DataSet but the JoinDataSet and the CoGroupDataSet do in the
> > form
> > > > of an apply function.
> > > > ​
> > > >
> > > > On Tue, Feb 9, 2016 at 11:09 AM, Stefano Baghino <
> > > > stefano.baghino@radicalbit.io> wrote:
> > > >
> > > > > Sure, it was just a draft. I agree that filter and mapPartition
> make
> > > > sense,
> > > > > but coGroup and join don't look like they take a function.
> > > > >
> > > > > On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <
> trohrmann@apache.org
> > >
> > > > > wrote:
> > > > >
> > > > > > This looks like a good design to me :-) The only thing is that it
> > is
> > > > not
> > > > > > complete. For example, the filter, mapPartition, coGroup and join
> > > > > functions
> > > > > > are missing.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > > ​
> > > > > >
> > > > > > On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > >
> > > > > > > What do you think of something like this?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> > > > > > >
> > > > > > > In this way, several extensions can be collected in this
> package
> > > > object
> > > > > > and
> > > > > > > picked altogether or a-là-carte (e.g. import
> > > > > > > org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> > > > > > >
> > > > > > > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <
> > > trohrmann@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I like the idea to support partial functions with Flink’s
> Scala
> > > > API.
> > > > > > > > However, I think that breaking the API and making it
> > inconsistent
> > > > > with
> > > > > > > > respect to the Java API is not the best option. I would
> rather
> > be
> > > > in
> > > > > > > favour
> > > > > > > > of the first proposal where we add a new method xxxWith via
> > > > implicit
> > > > > > > > conversions.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Till
> > > > > > > > ​
> > > > > > > >
> > > > > > > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > >
> > > > > > > > > It took me a little time but I was able to put together
> some
> > > > code.
> > > > > > > > >
> > > > > > > > > In this commit I just added a few methods renamed to
> prevent
> > > > > > > overloading,
> > > > > > > > > thus usable with PartialFunction instead of functions:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > > > > > > >
> > > > > > > > > In this other commit I coded the original proposal,
> renaming
> > > the
> > > > > > > methods
> > > > > > > > to
> > > > > > > > > obtain the same effect as before, but with lower friction
> for
> > > > Scala
> > > > > > > > > developers (and provided some usage examples):
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > > > > > > >
> > > > > > > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Stephan,
> > > > > > > > > >
> > > > > > > > > > thank you for the quick reply and for your feedback; I
> > agree
> > > > with
> > > > > > you
> > > > > > > > > that
> > > > > > > > > > breaking changes have to taken very seriously.
> > > > > > > > > >
> > > > > > > > > > The rationale behind my proposal is that Scala users are
> > > > already
> > > > > > > > > > accustomed to higher-order functions that manipulate
> > > > collections
> > > > > > and
> > > > > > > it
> > > > > > > > > > would beneficial for them to have an API that tries to
> > adhere
> > > > as
> > > > > > much
> > > > > > > > as
> > > > > > > > > > possible to the interface provided by the Scala
> Collections
> > > > API.
> > > > > > IMHO
> > > > > > > > > being
> > > > > > > > > > able to manipulate a DataSet or DataStream like a Scala
> > > > > collection
> > > > > > > > > > idiomatically would appeal to developers and reduce the
> > > > friction
> > > > > > for
> > > > > > > > them
> > > > > > > > > > to learn Flink.
> > > > > > > > > >
> > > > > > > > > > If we want to pursue the renaming path, I think these
> > changes
> > > > > (and
> > > > > > > > > porting
> > > > > > > > > > the rest of the codebase, like `flink-ml` and
> > > `flink-contrib`,
> > > > to
> > > > > > the
> > > > > > > > new
> > > > > > > > > > method names) can be done in relatively little time.
> Since
> > > > Flink
> > > > > is
> > > > > > > > > > approaching a major release, I think it's a good time to
> > > > consider
> > > > > > > this
> > > > > > > > > > change, if the community deems it relevant.
> > > > > > > > > >
> > > > > > > > > > While we await for feedback on the proposal, I can start
> > > > working
> > > > > on
> > > > > > > > both
> > > > > > > > > > paths to see how it would affect the codebase, what do
> you
> > > > think?
> > > > > > > > > >
> > > > > > > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <
> > > > sewen@apache.org>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi!
> > > > > > > > > >>
> > > > > > > > > >> Would be nice to support that, agreed.
> > > > > > > > > >>
> > > > > > > > > >> Such a fundamental break in the API worries me a bit,
> > though
> > > > - I
> > > > > > > would
> > > > > > > > > opt
> > > > > > > > > >> for a non-breaking addition.
> > > > > > > > > >> Wrapping the RichFunctions into Scala functions (which
> are
> > > > > > actually
> > > > > > > > > >> wrapped
> > > > > > > > > >> as rich functions) with implicits seems like a
> workaround
> > > for
> > > > > > > > something
> > > > > > > > > >> that should be very simple. Would probably also cost a
> bit
> > > of
> > > > > > > > > performance.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> I like the idea of "mapWith(...)" - if that were a
> simple
> > > non
> > > > > > > > overloaded
> > > > > > > > > >> function accepting a Scala function, it should accept
> > > > case-style
> > > > > > > > > >> functions,
> > > > > > > > > >> right?
> > > > > > > > > >> Simply adding that would probably solve things, but add
> a
> > > > second
> > > > > > > > variant
> > > > > > > > > >> of
> > > > > > > > > >> each function to the DataSet. An implicit conversion
> from
> > > > > DataSet
> > > > > > to
> > > > > > > > > >> DataSetExtended (which implements the mapWith,
> reduceWith,
> > > > ...)
> > > > > > > > methods
> > > > > > > > > >> could help there...
> > > > > > > > > >>
> > > > > > > > > >> What do you think?
> > > > > > > > > >>
> > > > > > > > > >> Greetings,
> > > > > > > > > >> Stephan
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > > > > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hello everybody,
> > > > > > > > > >> >
> > > > > > > > > >> > as I'm getting familiar with Flink I've found a
> possible
> > > > > > > improvement
> > > > > > > > > to
> > > > > > > > > >> the
> > > > > > > > > >> > Scala APIs: in Scala it's a common pattern to perform
> > > tuple
> > > > > > > > extraction
> > > > > > > > > >> > using pattern matching, making functions working on
> > tuples
> > > > > more
> > > > > > > > > >> readable,
> > > > > > > > > >> > like this:
> > > > > > > > > >> >
> > > > > > > > > >> > // referring to the mail count example in the training
> > > > > > > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > > > > > > >> > // a pair of date and a string with username and email
> > > > > > > > > >> > val monthsAndEmails =
> > > > > > > > > >> >   mails.map {
> > > > > > > > > >> >     case (date, sender) =>
> > > > > > > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > > > > > > >> >   }
> > > > > > > > > >> >
> > > > > > > > > >> > However, this is not possible when using the Scala
> APIs
> > > > > because
> > > > > > of
> > > > > > > > the
> > > > > > > > > >> > overloading of the `map` function in the `DataSet` and
> > > > > > > `DataStream`
> > > > > > > > > >> classes
> > > > > > > > > >> > (along with other higher-order function such as
> > `flatMap`
> > > > and
> > > > > > > > > >> `filter`). My
> > > > > > > > > >> > understanding is that the main reason to have two
> > > different
> > > > > > > > overloaded
> > > > > > > > > >> > functions is to provide support for `RichFunction`s.
> > > > > > > > > >> > I've found out there has been some interest around the
> > > issue
> > > > > in
> > > > > > > the
> > > > > > > > > >> past (
> > > > > > > > > >> > [FLINK-1159] <
> > > > > https://issues.apache.org/jira/browse/FLINK-1159
> > > > > > >).
> > > > > > > > > >> > In the past couple of days me and my colleague Andrea
> > have
> > > > > tried
> > > > > > > > > several
> > > > > > > > > >> > ways to address the problem, coming to two possible
> > > > solutions:
> > > > > > > > > >> >
> > > > > > > > > >> >    1. don't overload and use different names, e.g.
> `map`
> > > > > taking
> > > > > > a
> > > > > > > > > Scala
> > > > > > > > > >> >    function and `mapWith` taking a Flink MapFunction
> > > > > > > > > >> >    2. keep only the method taking a Scala function
> > (which
> > > > > would
> > > > > > be
> > > > > > > > > more
> > > > > > > > > >> >    idiomatic from a Scala perspective, IMHO) and
> > providing
> > > > an
> > > > > > > > implicit
> > > > > > > > > >> >    conversion from the Flink function to the Scala
> > > function
> > > > > > within
> > > > > > > > the
> > > > > > > > > >> >    `org.apache.flink.api.scala` package object
> > > > > > > > > >> >
> > > > > > > > > >> > We've also evaluated several other approaches using
> > union
> > > > > types
> > > > > > > and
> > > > > > > > > type
> > > > > > > > > >> > classes but we've found them to be too complex.
> > Regarding
> > > > the
> > > > > > two
> > > > > > > > > >> > approaches I've cited, the first would imply a
> breaking
> > > > change
> > > > > > to
> > > > > > > > the
> > > > > > > > > >> APIs,
> > > > > > > > > >> > while the second is giving me a hard time at figuring
> > out
> > > > some
> > > > > > > > > >> compilation
> > > > > > > > > >> > errors in `flink-libraries` and `flink-contrib` and as
> > we
> > > > > tested
> > > > > > > it
> > > > > > > > we
> > > > > > > > > >> > found out `RichMapFunction`s lose state (possibly
> > because
> > > of
> > > > > the
> > > > > > > > > double
> > > > > > > > > >> > conversion, first to a Scala function, then to a
> simple
> > > > > > > > > `MapFunction`).
> > > > > > > > > >> >
> > > > > > > > > >> > You can have a look at the code I've written so far
> here
> > > > > (last 2
> > > > > > > > > >> commits):
> > > > > > > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > > > > > > >> >
> > > > > > > > > >> > We had a little exchange of ideas and thought that the
> > > first
> > > > > > > > solution
> > > > > > > > > >> would
> > > > > > > > > >> > be easier and also interesting from the standpoint of
> > the
> > > > > > > ergonomics
> > > > > > > > > of
> > > > > > > > > >> the
> > > > > > > > > >> > API (e.g. `line mapWith new LineSplitter`) and would
> > like
> > > to
> > > > > > > gather
> > > > > > > > > some
> > > > > > > > > >> > feedback on the feasibility of this change.
> > > > > > > > > >> >
> > > > > > > > > >> > Would this still be regarded as a relevant
> improvement?
> > > What
> > > > > do
> > > > > > > you
> > > > > > > > > >> think
> > > > > > > > > >> > about it? Do you think there's time to work on them
> > before
> > > > the
> > > > > > 1.0
> > > > > > > > > >> release?
> > > > > > > > > >> > What do you think about introducing breaking changes
> to
> > > make
> > > > > > this
> > > > > > > > > >> pattern
> > > > > > > > > >> > available to Scala users?
> > > > > > > > > >> >
> > > > > > > > > >> > Thank you all in advance for your feedback.
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> > BR,
> > > > > > > > > >> > Stefano Baghino
> > > > > > > > > >> >
> > > > > > > > > >> > Software Engineer @ Radicalbit
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > BR,
> > > > > > > > > > Stefano Baghino
> > > > > > > > > >
> > > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > BR,
> > > > > > > > > Stefano Baghino
> > > > > > > > >
> > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > BR,
> > > > > > > Stefano Baghino
> > > > > > >
> > > > > > > Software Engineer @ Radicalbit
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > BR,
> > > > > Stefano Baghino
> > > > >
> > > > > Software Engineer @ Radicalbit
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > BR,
> > > Stefano Baghino
> > >
> > > Software Engineer @ Radicalbit
> > >
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Re: Case style anonymous functions not supported by Scala API

Posted by Stefano Baghino <st...@radicalbit.io>.
I agree with you, but I acknowledge that there may be concerns regarding
the stability of the API. Perhaps the rationale behind the proposal of
Stephan and Till is to provide it as an extension to test how the
developers feel about it. It would be ideal to have a larger feedback from
the community. However I have to admit I like the approach.

On Tue, Feb 9, 2016 at 2:43 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> Thanks for bringing this up Stefano, it would a very welcome addition
> indeed.
>
> I like the approach of having extensions through implicits as well. IMHO
> though this should be the default
> behavior, without the need to add another import.
>
> On Tue, Feb 9, 2016 at 1:29 PM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
> > I see, thanks for the tip! I'll work on it; meanwhile, I've added some
> > functions and Scaladoc:
> >
> >
> https://github.com/radicalbit/flink/blob/1159-implicit/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
> >
> > On Tue, Feb 9, 2016 at 12:01 PM, Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > Not the DataSet but the JoinDataSet and the CoGroupDataSet do in the
> form
> > > of an apply function.
> > > ​
> > >
> > > On Tue, Feb 9, 2016 at 11:09 AM, Stefano Baghino <
> > > stefano.baghino@radicalbit.io> wrote:
> > >
> > > > Sure, it was just a draft. I agree that filter and mapPartition make
> > > sense,
> > > > but coGroup and join don't look like they take a function.
> > > >
> > > > On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <trohrmann@apache.org
> >
> > > > wrote:
> > > >
> > > > > This looks like a good design to me :-) The only thing is that it
> is
> > > not
> > > > > complete. For example, the filter, mapPartition, coGroup and join
> > > > functions
> > > > > are missing.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > > ​
> > > > >
> > > > > On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> > > > > stefano.baghino@radicalbit.io> wrote:
> > > > >
> > > > > > What do you think of something like this?
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> > > > > >
> > > > > > In this way, several extensions can be collected in this package
> > > object
> > > > > and
> > > > > > picked altogether or a-là-carte (e.g. import
> > > > > > org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> > > > > >
> > > > > > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <
> > trohrmann@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > I like the idea to support partial functions with Flink’s Scala
> > > API.
> > > > > > > However, I think that breaking the API and making it
> inconsistent
> > > > with
> > > > > > > respect to the Java API is not the best option. I would rather
> be
> > > in
> > > > > > favour
> > > > > > > of the first proposal where we add a new method xxxWith via
> > > implicit
> > > > > > > conversions.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > > ​
> > > > > > >
> > > > > > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > >
> > > > > > > > It took me a little time but I was able to put together some
> > > code.
> > > > > > > >
> > > > > > > > In this commit I just added a few methods renamed to prevent
> > > > > > overloading,
> > > > > > > > thus usable with PartialFunction instead of functions:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > > > > > >
> > > > > > > > In this other commit I coded the original proposal, renaming
> > the
> > > > > > methods
> > > > > > > to
> > > > > > > > obtain the same effect as before, but with lower friction for
> > > Scala
> > > > > > > > developers (and provided some usage examples):
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > > > > > >
> > > > > > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > > >
> > > > > > > > > Hi Stephan,
> > > > > > > > >
> > > > > > > > > thank you for the quick reply and for your feedback; I
> agree
> > > with
> > > > > you
> > > > > > > > that
> > > > > > > > > breaking changes have to taken very seriously.
> > > > > > > > >
> > > > > > > > > The rationale behind my proposal is that Scala users are
> > > already
> > > > > > > > > accustomed to higher-order functions that manipulate
> > > collections
> > > > > and
> > > > > > it
> > > > > > > > > would beneficial for them to have an API that tries to
> adhere
> > > as
> > > > > much
> > > > > > > as
> > > > > > > > > possible to the interface provided by the Scala Collections
> > > API.
> > > > > IMHO
> > > > > > > > being
> > > > > > > > > able to manipulate a DataSet or DataStream like a Scala
> > > > collection
> > > > > > > > > idiomatically would appeal to developers and reduce the
> > > friction
> > > > > for
> > > > > > > them
> > > > > > > > > to learn Flink.
> > > > > > > > >
> > > > > > > > > If we want to pursue the renaming path, I think these
> changes
> > > > (and
> > > > > > > > porting
> > > > > > > > > the rest of the codebase, like `flink-ml` and
> > `flink-contrib`,
> > > to
> > > > > the
> > > > > > > new
> > > > > > > > > method names) can be done in relatively little time. Since
> > > Flink
> > > > is
> > > > > > > > > approaching a major release, I think it's a good time to
> > > consider
> > > > > > this
> > > > > > > > > change, if the community deems it relevant.
> > > > > > > > >
> > > > > > > > > While we await for feedback on the proposal, I can start
> > > working
> > > > on
> > > > > > > both
> > > > > > > > > paths to see how it would affect the codebase, what do you
> > > think?
> > > > > > > > >
> > > > > > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <
> > > sewen@apache.org>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi!
> > > > > > > > >>
> > > > > > > > >> Would be nice to support that, agreed.
> > > > > > > > >>
> > > > > > > > >> Such a fundamental break in the API worries me a bit,
> though
> > > - I
> > > > > > would
> > > > > > > > opt
> > > > > > > > >> for a non-breaking addition.
> > > > > > > > >> Wrapping the RichFunctions into Scala functions (which are
> > > > > actually
> > > > > > > > >> wrapped
> > > > > > > > >> as rich functions) with implicits seems like a workaround
> > for
> > > > > > > something
> > > > > > > > >> that should be very simple. Would probably also cost a bit
> > of
> > > > > > > > performance.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> I like the idea of "mapWith(...)" - if that were a simple
> > non
> > > > > > > overloaded
> > > > > > > > >> function accepting a Scala function, it should accept
> > > case-style
> > > > > > > > >> functions,
> > > > > > > > >> right?
> > > > > > > > >> Simply adding that would probably solve things, but add a
> > > second
> > > > > > > variant
> > > > > > > > >> of
> > > > > > > > >> each function to the DataSet. An implicit conversion from
> > > > DataSet
> > > > > to
> > > > > > > > >> DataSetExtended (which implements the mapWith, reduceWith,
> > > ...)
> > > > > > > methods
> > > > > > > > >> could help there...
> > > > > > > > >>
> > > > > > > > >> What do you think?
> > > > > > > > >>
> > > > > > > > >> Greetings,
> > > > > > > > >> Stephan
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > > > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hello everybody,
> > > > > > > > >> >
> > > > > > > > >> > as I'm getting familiar with Flink I've found a possible
> > > > > > improvement
> > > > > > > > to
> > > > > > > > >> the
> > > > > > > > >> > Scala APIs: in Scala it's a common pattern to perform
> > tuple
> > > > > > > extraction
> > > > > > > > >> > using pattern matching, making functions working on
> tuples
> > > > more
> > > > > > > > >> readable,
> > > > > > > > >> > like this:
> > > > > > > > >> >
> > > > > > > > >> > // referring to the mail count example in the training
> > > > > > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > > > > > >> > // a pair of date and a string with username and email
> > > > > > > > >> > val monthsAndEmails =
> > > > > > > > >> >   mails.map {
> > > > > > > > >> >     case (date, sender) =>
> > > > > > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > > > > > >> >   }
> > > > > > > > >> >
> > > > > > > > >> > However, this is not possible when using the Scala APIs
> > > > because
> > > > > of
> > > > > > > the
> > > > > > > > >> > overloading of the `map` function in the `DataSet` and
> > > > > > `DataStream`
> > > > > > > > >> classes
> > > > > > > > >> > (along with other higher-order function such as
> `flatMap`
> > > and
> > > > > > > > >> `filter`). My
> > > > > > > > >> > understanding is that the main reason to have two
> > different
> > > > > > > overloaded
> > > > > > > > >> > functions is to provide support for `RichFunction`s.
> > > > > > > > >> > I've found out there has been some interest around the
> > issue
> > > > in
> > > > > > the
> > > > > > > > >> past (
> > > > > > > > >> > [FLINK-1159] <
> > > > https://issues.apache.org/jira/browse/FLINK-1159
> > > > > >).
> > > > > > > > >> > In the past couple of days me and my colleague Andrea
> have
> > > > tried
> > > > > > > > several
> > > > > > > > >> > ways to address the problem, coming to two possible
> > > solutions:
> > > > > > > > >> >
> > > > > > > > >> >    1. don't overload and use different names, e.g. `map`
> > > > taking
> > > > > a
> > > > > > > > Scala
> > > > > > > > >> >    function and `mapWith` taking a Flink MapFunction
> > > > > > > > >> >    2. keep only the method taking a Scala function
> (which
> > > > would
> > > > > be
> > > > > > > > more
> > > > > > > > >> >    idiomatic from a Scala perspective, IMHO) and
> providing
> > > an
> > > > > > > implicit
> > > > > > > > >> >    conversion from the Flink function to the Scala
> > function
> > > > > within
> > > > > > > the
> > > > > > > > >> >    `org.apache.flink.api.scala` package object
> > > > > > > > >> >
> > > > > > > > >> > We've also evaluated several other approaches using
> union
> > > > types
> > > > > > and
> > > > > > > > type
> > > > > > > > >> > classes but we've found them to be too complex.
> Regarding
> > > the
> > > > > two
> > > > > > > > >> > approaches I've cited, the first would imply a breaking
> > > change
> > > > > to
> > > > > > > the
> > > > > > > > >> APIs,
> > > > > > > > >> > while the second is giving me a hard time at figuring
> out
> > > some
> > > > > > > > >> compilation
> > > > > > > > >> > errors in `flink-libraries` and `flink-contrib` and as
> we
> > > > tested
> > > > > > it
> > > > > > > we
> > > > > > > > >> > found out `RichMapFunction`s lose state (possibly
> because
> > of
> > > > the
> > > > > > > > double
> > > > > > > > >> > conversion, first to a Scala function, then to a simple
> > > > > > > > `MapFunction`).
> > > > > > > > >> >
> > > > > > > > >> > You can have a look at the code I've written so far here
> > > > (last 2
> > > > > > > > >> commits):
> > > > > > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > > > > > >> >
> > > > > > > > >> > We had a little exchange of ideas and thought that the
> > first
> > > > > > > solution
> > > > > > > > >> would
> > > > > > > > >> > be easier and also interesting from the standpoint of
> the
> > > > > > ergonomics
> > > > > > > > of
> > > > > > > > >> the
> > > > > > > > >> > API (e.g. `line mapWith new LineSplitter`) and would
> like
> > to
> > > > > > gather
> > > > > > > > some
> > > > > > > > >> > feedback on the feasibility of this change.
> > > > > > > > >> >
> > > > > > > > >> > Would this still be regarded as a relevant improvement?
> > What
> > > > do
> > > > > > you
> > > > > > > > >> think
> > > > > > > > >> > about it? Do you think there's time to work on them
> before
> > > the
> > > > > 1.0
> > > > > > > > >> release?
> > > > > > > > >> > What do you think about introducing breaking changes to
> > make
> > > > > this
> > > > > > > > >> pattern
> > > > > > > > >> > available to Scala users?
> > > > > > > > >> >
> > > > > > > > >> > Thank you all in advance for your feedback.
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> > BR,
> > > > > > > > >> > Stefano Baghino
> > > > > > > > >> >
> > > > > > > > >> > Software Engineer @ Radicalbit
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > BR,
> > > > > > > > > Stefano Baghino
> > > > > > > > >
> > > > > > > > > Software Engineer @ Radicalbit
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > BR,
> > > > > > > > Stefano Baghino
> > > > > > > >
> > > > > > > > Software Engineer @ Radicalbit
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > BR,
> > > > > > Stefano Baghino
> > > > > >
> > > > > > Software Engineer @ Radicalbit
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > BR,
> > > > Stefano Baghino
> > > >
> > > > Software Engineer @ Radicalbit
> > > >
> > >
> >
> >
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
> >
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Theodore Vasiloudis <th...@gmail.com>.
Thanks for bringing this up Stefano, it would a very welcome addition
indeed.

I like the approach of having extensions through implicits as well. IMHO
though this should be the default
behavior, without the need to add another import.

On Tue, Feb 9, 2016 at 1:29 PM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> I see, thanks for the tip! I'll work on it; meanwhile, I've added some
> functions and Scaladoc:
>
> https://github.com/radicalbit/flink/blob/1159-implicit/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala
>
> On Tue, Feb 9, 2016 at 12:01 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Not the DataSet but the JoinDataSet and the CoGroupDataSet do in the form
> > of an apply function.
> > ​
> >
> > On Tue, Feb 9, 2016 at 11:09 AM, Stefano Baghino <
> > stefano.baghino@radicalbit.io> wrote:
> >
> > > Sure, it was just a draft. I agree that filter and mapPartition make
> > sense,
> > > but coGroup and join don't look like they take a function.
> > >
> > > On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <tr...@apache.org>
> > > wrote:
> > >
> > > > This looks like a good design to me :-) The only thing is that it is
> > not
> > > > complete. For example, the filter, mapPartition, coGroup and join
> > > functions
> > > > are missing.
> > > >
> > > > Cheers,
> > > > Till
> > > > ​
> > > >
> > > > On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> > > > stefano.baghino@radicalbit.io> wrote:
> > > >
> > > > > What do you think of something like this?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> > > > >
> > > > > In this way, several extensions can be collected in this package
> > object
> > > > and
> > > > > picked altogether or a-là-carte (e.g. import
> > > > > org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> > > > >
> > > > > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <
> trohrmann@apache.org>
> > > > > wrote:
> > > > >
> > > > > > I like the idea to support partial functions with Flink’s Scala
> > API.
> > > > > > However, I think that breaking the API and making it inconsistent
> > > with
> > > > > > respect to the Java API is not the best option. I would rather be
> > in
> > > > > favour
> > > > > > of the first proposal where we add a new method xxxWith via
> > implicit
> > > > > > conversions.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > > ​
> > > > > >
> > > > > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > >
> > > > > > > It took me a little time but I was able to put together some
> > code.
> > > > > > >
> > > > > > > In this commit I just added a few methods renamed to prevent
> > > > > overloading,
> > > > > > > thus usable with PartialFunction instead of functions:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > > > > >
> > > > > > > In this other commit I coded the original proposal, renaming
> the
> > > > > methods
> > > > > > to
> > > > > > > obtain the same effect as before, but with lower friction for
> > Scala
> > > > > > > developers (and provided some usage examples):
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > > > > >
> > > > > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > > >
> > > > > > > > Hi Stephan,
> > > > > > > >
> > > > > > > > thank you for the quick reply and for your feedback; I agree
> > with
> > > > you
> > > > > > > that
> > > > > > > > breaking changes have to taken very seriously.
> > > > > > > >
> > > > > > > > The rationale behind my proposal is that Scala users are
> > already
> > > > > > > > accustomed to higher-order functions that manipulate
> > collections
> > > > and
> > > > > it
> > > > > > > > would beneficial for them to have an API that tries to adhere
> > as
> > > > much
> > > > > > as
> > > > > > > > possible to the interface provided by the Scala Collections
> > API.
> > > > IMHO
> > > > > > > being
> > > > > > > > able to manipulate a DataSet or DataStream like a Scala
> > > collection
> > > > > > > > idiomatically would appeal to developers and reduce the
> > friction
> > > > for
> > > > > > them
> > > > > > > > to learn Flink.
> > > > > > > >
> > > > > > > > If we want to pursue the renaming path, I think these changes
> > > (and
> > > > > > > porting
> > > > > > > > the rest of the codebase, like `flink-ml` and
> `flink-contrib`,
> > to
> > > > the
> > > > > > new
> > > > > > > > method names) can be done in relatively little time. Since
> > Flink
> > > is
> > > > > > > > approaching a major release, I think it's a good time to
> > consider
> > > > > this
> > > > > > > > change, if the community deems it relevant.
> > > > > > > >
> > > > > > > > While we await for feedback on the proposal, I can start
> > working
> > > on
> > > > > > both
> > > > > > > > paths to see how it would affect the codebase, what do you
> > think?
> > > > > > > >
> > > > > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <
> > sewen@apache.org>
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi!
> > > > > > > >>
> > > > > > > >> Would be nice to support that, agreed.
> > > > > > > >>
> > > > > > > >> Such a fundamental break in the API worries me a bit, though
> > - I
> > > > > would
> > > > > > > opt
> > > > > > > >> for a non-breaking addition.
> > > > > > > >> Wrapping the RichFunctions into Scala functions (which are
> > > > actually
> > > > > > > >> wrapped
> > > > > > > >> as rich functions) with implicits seems like a workaround
> for
> > > > > > something
> > > > > > > >> that should be very simple. Would probably also cost a bit
> of
> > > > > > > performance.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> I like the idea of "mapWith(...)" - if that were a simple
> non
> > > > > > overloaded
> > > > > > > >> function accepting a Scala function, it should accept
> > case-style
> > > > > > > >> functions,
> > > > > > > >> right?
> > > > > > > >> Simply adding that would probably solve things, but add a
> > second
> > > > > > variant
> > > > > > > >> of
> > > > > > > >> each function to the DataSet. An implicit conversion from
> > > DataSet
> > > > to
> > > > > > > >> DataSetExtended (which implements the mapWith, reduceWith,
> > ...)
> > > > > > methods
> > > > > > > >> could help there...
> > > > > > > >>
> > > > > > > >> What do you think?
> > > > > > > >>
> > > > > > > >> Greetings,
> > > > > > > >> Stephan
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > > > > >>
> > > > > > > >> > Hello everybody,
> > > > > > > >> >
> > > > > > > >> > as I'm getting familiar with Flink I've found a possible
> > > > > improvement
> > > > > > > to
> > > > > > > >> the
> > > > > > > >> > Scala APIs: in Scala it's a common pattern to perform
> tuple
> > > > > > extraction
> > > > > > > >> > using pattern matching, making functions working on tuples
> > > more
> > > > > > > >> readable,
> > > > > > > >> > like this:
> > > > > > > >> >
> > > > > > > >> > // referring to the mail count example in the training
> > > > > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > > > > >> > // a pair of date and a string with username and email
> > > > > > > >> > val monthsAndEmails =
> > > > > > > >> >   mails.map {
> > > > > > > >> >     case (date, sender) =>
> > > > > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > > > > >> >   }
> > > > > > > >> >
> > > > > > > >> > However, this is not possible when using the Scala APIs
> > > because
> > > > of
> > > > > > the
> > > > > > > >> > overloading of the `map` function in the `DataSet` and
> > > > > `DataStream`
> > > > > > > >> classes
> > > > > > > >> > (along with other higher-order function such as `flatMap`
> > and
> > > > > > > >> `filter`). My
> > > > > > > >> > understanding is that the main reason to have two
> different
> > > > > > overloaded
> > > > > > > >> > functions is to provide support for `RichFunction`s.
> > > > > > > >> > I've found out there has been some interest around the
> issue
> > > in
> > > > > the
> > > > > > > >> past (
> > > > > > > >> > [FLINK-1159] <
> > > https://issues.apache.org/jira/browse/FLINK-1159
> > > > >).
> > > > > > > >> > In the past couple of days me and my colleague Andrea have
> > > tried
> > > > > > > several
> > > > > > > >> > ways to address the problem, coming to two possible
> > solutions:
> > > > > > > >> >
> > > > > > > >> >    1. don't overload and use different names, e.g. `map`
> > > taking
> > > > a
> > > > > > > Scala
> > > > > > > >> >    function and `mapWith` taking a Flink MapFunction
> > > > > > > >> >    2. keep only the method taking a Scala function (which
> > > would
> > > > be
> > > > > > > more
> > > > > > > >> >    idiomatic from a Scala perspective, IMHO) and providing
> > an
> > > > > > implicit
> > > > > > > >> >    conversion from the Flink function to the Scala
> function
> > > > within
> > > > > > the
> > > > > > > >> >    `org.apache.flink.api.scala` package object
> > > > > > > >> >
> > > > > > > >> > We've also evaluated several other approaches using union
> > > types
> > > > > and
> > > > > > > type
> > > > > > > >> > classes but we've found them to be too complex. Regarding
> > the
> > > > two
> > > > > > > >> > approaches I've cited, the first would imply a breaking
> > change
> > > > to
> > > > > > the
> > > > > > > >> APIs,
> > > > > > > >> > while the second is giving me a hard time at figuring out
> > some
> > > > > > > >> compilation
> > > > > > > >> > errors in `flink-libraries` and `flink-contrib` and as we
> > > tested
> > > > > it
> > > > > > we
> > > > > > > >> > found out `RichMapFunction`s lose state (possibly because
> of
> > > the
> > > > > > > double
> > > > > > > >> > conversion, first to a Scala function, then to a simple
> > > > > > > `MapFunction`).
> > > > > > > >> >
> > > > > > > >> > You can have a look at the code I've written so far here
> > > (last 2
> > > > > > > >> commits):
> > > > > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > > > > >> >
> > > > > > > >> > We had a little exchange of ideas and thought that the
> first
> > > > > > solution
> > > > > > > >> would
> > > > > > > >> > be easier and also interesting from the standpoint of the
> > > > > ergonomics
> > > > > > > of
> > > > > > > >> the
> > > > > > > >> > API (e.g. `line mapWith new LineSplitter`) and would like
> to
> > > > > gather
> > > > > > > some
> > > > > > > >> > feedback on the feasibility of this change.
> > > > > > > >> >
> > > > > > > >> > Would this still be regarded as a relevant improvement?
> What
> > > do
> > > > > you
> > > > > > > >> think
> > > > > > > >> > about it? Do you think there's time to work on them before
> > the
> > > > 1.0
> > > > > > > >> release?
> > > > > > > >> > What do you think about introducing breaking changes to
> make
> > > > this
> > > > > > > >> pattern
> > > > > > > >> > available to Scala users?
> > > > > > > >> >
> > > > > > > >> > Thank you all in advance for your feedback.
> > > > > > > >> >
> > > > > > > >> > --
> > > > > > > >> > BR,
> > > > > > > >> > Stefano Baghino
> > > > > > > >> >
> > > > > > > >> > Software Engineer @ Radicalbit
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > BR,
> > > > > > > > Stefano Baghino
> > > > > > > >
> > > > > > > > Software Engineer @ Radicalbit
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > BR,
> > > > > > > Stefano Baghino
> > > > > > >
> > > > > > > Software Engineer @ Radicalbit
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > BR,
> > > > > Stefano Baghino
> > > > >
> > > > > Software Engineer @ Radicalbit
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > BR,
> > > Stefano Baghino
> > >
> > > Software Engineer @ Radicalbit
> > >
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Re: Case style anonymous functions not supported by Scala API

Posted by Stefano Baghino <st...@radicalbit.io>.
I see, thanks for the tip! I'll work on it; meanwhile, I've added some
functions and Scaladoc:
https://github.com/radicalbit/flink/blob/1159-implicit/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/package.scala

On Tue, Feb 9, 2016 at 12:01 PM, Till Rohrmann <tr...@apache.org> wrote:

> Not the DataSet but the JoinDataSet and the CoGroupDataSet do in the form
> of an apply function.
> ​
>
> On Tue, Feb 9, 2016 at 11:09 AM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
> > Sure, it was just a draft. I agree that filter and mapPartition make
> sense,
> > but coGroup and join don't look like they take a function.
> >
> > On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > This looks like a good design to me :-) The only thing is that it is
> not
> > > complete. For example, the filter, mapPartition, coGroup and join
> > functions
> > > are missing.
> > >
> > > Cheers,
> > > Till
> > > ​
> > >
> > > On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> > > stefano.baghino@radicalbit.io> wrote:
> > >
> > > > What do you think of something like this?
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> > > >
> > > > In this way, several extensions can be collected in this package
> object
> > > and
> > > > picked altogether or a-là-carte (e.g. import
> > > > org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> > > >
> > > > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <tr...@apache.org>
> > > > wrote:
> > > >
> > > > > I like the idea to support partial functions with Flink’s Scala
> API.
> > > > > However, I think that breaking the API and making it inconsistent
> > with
> > > > > respect to the Java API is not the best option. I would rather be
> in
> > > > favour
> > > > > of the first proposal where we add a new method xxxWith via
> implicit
> > > > > conversions.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > > ​
> > > > >
> > > > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > > > stefano.baghino@radicalbit.io> wrote:
> > > > >
> > > > > > It took me a little time but I was able to put together some
> code.
> > > > > >
> > > > > > In this commit I just added a few methods renamed to prevent
> > > > overloading,
> > > > > > thus usable with PartialFunction instead of functions:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > > > >
> > > > > > In this other commit I coded the original proposal, renaming the
> > > > methods
> > > > > to
> > > > > > obtain the same effect as before, but with lower friction for
> Scala
> > > > > > developers (and provided some usage examples):
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > > > >
> > > > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > > > stefano.baghino@radicalbit.io> wrote:
> > > > > >
> > > > > > > Hi Stephan,
> > > > > > >
> > > > > > > thank you for the quick reply and for your feedback; I agree
> with
> > > you
> > > > > > that
> > > > > > > breaking changes have to taken very seriously.
> > > > > > >
> > > > > > > The rationale behind my proposal is that Scala users are
> already
> > > > > > > accustomed to higher-order functions that manipulate
> collections
> > > and
> > > > it
> > > > > > > would beneficial for them to have an API that tries to adhere
> as
> > > much
> > > > > as
> > > > > > > possible to the interface provided by the Scala Collections
> API.
> > > IMHO
> > > > > > being
> > > > > > > able to manipulate a DataSet or DataStream like a Scala
> > collection
> > > > > > > idiomatically would appeal to developers and reduce the
> friction
> > > for
> > > > > them
> > > > > > > to learn Flink.
> > > > > > >
> > > > > > > If we want to pursue the renaming path, I think these changes
> > (and
> > > > > > porting
> > > > > > > the rest of the codebase, like `flink-ml` and `flink-contrib`,
> to
> > > the
> > > > > new
> > > > > > > method names) can be done in relatively little time. Since
> Flink
> > is
> > > > > > > approaching a major release, I think it's a good time to
> consider
> > > > this
> > > > > > > change, if the community deems it relevant.
> > > > > > >
> > > > > > > While we await for feedback on the proposal, I can start
> working
> > on
> > > > > both
> > > > > > > paths to see how it would affect the codebase, what do you
> think?
> > > > > > >
> > > > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <
> sewen@apache.org>
> > > > > wrote:
> > > > > > >
> > > > > > >> Hi!
> > > > > > >>
> > > > > > >> Would be nice to support that, agreed.
> > > > > > >>
> > > > > > >> Such a fundamental break in the API worries me a bit, though
> - I
> > > > would
> > > > > > opt
> > > > > > >> for a non-breaking addition.
> > > > > > >> Wrapping the RichFunctions into Scala functions (which are
> > > actually
> > > > > > >> wrapped
> > > > > > >> as rich functions) with implicits seems like a workaround for
> > > > > something
> > > > > > >> that should be very simple. Would probably also cost a bit of
> > > > > > performance.
> > > > > > >>
> > > > > > >>
> > > > > > >> I like the idea of "mapWith(...)" - if that were a simple non
> > > > > overloaded
> > > > > > >> function accepting a Scala function, it should accept
> case-style
> > > > > > >> functions,
> > > > > > >> right?
> > > > > > >> Simply adding that would probably solve things, but add a
> second
> > > > > variant
> > > > > > >> of
> > > > > > >> each function to the DataSet. An implicit conversion from
> > DataSet
> > > to
> > > > > > >> DataSetExtended (which implements the mapWith, reduceWith,
> ...)
> > > > > methods
> > > > > > >> could help there...
> > > > > > >>
> > > > > > >> What do you think?
> > > > > > >>
> > > > > > >> Greetings,
> > > > > > >> Stephan
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > > > >>
> > > > > > >> > Hello everybody,
> > > > > > >> >
> > > > > > >> > as I'm getting familiar with Flink I've found a possible
> > > > improvement
> > > > > > to
> > > > > > >> the
> > > > > > >> > Scala APIs: in Scala it's a common pattern to perform tuple
> > > > > extraction
> > > > > > >> > using pattern matching, making functions working on tuples
> > more
> > > > > > >> readable,
> > > > > > >> > like this:
> > > > > > >> >
> > > > > > >> > // referring to the mail count example in the training
> > > > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > > > >> > // a pair of date and a string with username and email
> > > > > > >> > val monthsAndEmails =
> > > > > > >> >   mails.map {
> > > > > > >> >     case (date, sender) =>
> > > > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > > > >> >   }
> > > > > > >> >
> > > > > > >> > However, this is not possible when using the Scala APIs
> > because
> > > of
> > > > > the
> > > > > > >> > overloading of the `map` function in the `DataSet` and
> > > > `DataStream`
> > > > > > >> classes
> > > > > > >> > (along with other higher-order function such as `flatMap`
> and
> > > > > > >> `filter`). My
> > > > > > >> > understanding is that the main reason to have two different
> > > > > overloaded
> > > > > > >> > functions is to provide support for `RichFunction`s.
> > > > > > >> > I've found out there has been some interest around the issue
> > in
> > > > the
> > > > > > >> past (
> > > > > > >> > [FLINK-1159] <
> > https://issues.apache.org/jira/browse/FLINK-1159
> > > >).
> > > > > > >> > In the past couple of days me and my colleague Andrea have
> > tried
> > > > > > several
> > > > > > >> > ways to address the problem, coming to two possible
> solutions:
> > > > > > >> >
> > > > > > >> >    1. don't overload and use different names, e.g. `map`
> > taking
> > > a
> > > > > > Scala
> > > > > > >> >    function and `mapWith` taking a Flink MapFunction
> > > > > > >> >    2. keep only the method taking a Scala function (which
> > would
> > > be
> > > > > > more
> > > > > > >> >    idiomatic from a Scala perspective, IMHO) and providing
> an
> > > > > implicit
> > > > > > >> >    conversion from the Flink function to the Scala function
> > > within
> > > > > the
> > > > > > >> >    `org.apache.flink.api.scala` package object
> > > > > > >> >
> > > > > > >> > We've also evaluated several other approaches using union
> > types
> > > > and
> > > > > > type
> > > > > > >> > classes but we've found them to be too complex. Regarding
> the
> > > two
> > > > > > >> > approaches I've cited, the first would imply a breaking
> change
> > > to
> > > > > the
> > > > > > >> APIs,
> > > > > > >> > while the second is giving me a hard time at figuring out
> some
> > > > > > >> compilation
> > > > > > >> > errors in `flink-libraries` and `flink-contrib` and as we
> > tested
> > > > it
> > > > > we
> > > > > > >> > found out `RichMapFunction`s lose state (possibly because of
> > the
> > > > > > double
> > > > > > >> > conversion, first to a Scala function, then to a simple
> > > > > > `MapFunction`).
> > > > > > >> >
> > > > > > >> > You can have a look at the code I've written so far here
> > (last 2
> > > > > > >> commits):
> > > > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > > > >> >
> > > > > > >> > We had a little exchange of ideas and thought that the first
> > > > > solution
> > > > > > >> would
> > > > > > >> > be easier and also interesting from the standpoint of the
> > > > ergonomics
> > > > > > of
> > > > > > >> the
> > > > > > >> > API (e.g. `line mapWith new LineSplitter`) and would like to
> > > > gather
> > > > > > some
> > > > > > >> > feedback on the feasibility of this change.
> > > > > > >> >
> > > > > > >> > Would this still be regarded as a relevant improvement? What
> > do
> > > > you
> > > > > > >> think
> > > > > > >> > about it? Do you think there's time to work on them before
> the
> > > 1.0
> > > > > > >> release?
> > > > > > >> > What do you think about introducing breaking changes to make
> > > this
> > > > > > >> pattern
> > > > > > >> > available to Scala users?
> > > > > > >> >
> > > > > > >> > Thank you all in advance for your feedback.
> > > > > > >> >
> > > > > > >> > --
> > > > > > >> > BR,
> > > > > > >> > Stefano Baghino
> > > > > > >> >
> > > > > > >> > Software Engineer @ Radicalbit
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > BR,
> > > > > > > Stefano Baghino
> > > > > > >
> > > > > > > Software Engineer @ Radicalbit
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > BR,
> > > > > > Stefano Baghino
> > > > > >
> > > > > > Software Engineer @ Radicalbit
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > BR,
> > > > Stefano Baghino
> > > >
> > > > Software Engineer @ Radicalbit
> > > >
> > >
> >
> >
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
> >
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Till Rohrmann <tr...@apache.org>.
Not the DataSet but the JoinDataSet and the CoGroupDataSet do in the form
of an apply function.
​

On Tue, Feb 9, 2016 at 11:09 AM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> Sure, it was just a draft. I agree that filter and mapPartition make sense,
> but coGroup and join don't look like they take a function.
>
> On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> > This looks like a good design to me :-) The only thing is that it is not
> > complete. For example, the filter, mapPartition, coGroup and join
> functions
> > are missing.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> > stefano.baghino@radicalbit.io> wrote:
> >
> > > What do you think of something like this?
> > >
> > >
> > >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> > >
> > > In this way, several extensions can be collected in this package object
> > and
> > > picked altogether or a-là-carte (e.g. import
> > > org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> > >
> > > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <tr...@apache.org>
> > > wrote:
> > >
> > > > I like the idea to support partial functions with Flink’s Scala API.
> > > > However, I think that breaking the API and making it inconsistent
> with
> > > > respect to the Java API is not the best option. I would rather be in
> > > favour
> > > > of the first proposal where we add a new method xxxWith via implicit
> > > > conversions.
> > > >
> > > > Cheers,
> > > > Till
> > > > ​
> > > >
> > > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > > stefano.baghino@radicalbit.io> wrote:
> > > >
> > > > > It took me a little time but I was able to put together some code.
> > > > >
> > > > > In this commit I just added a few methods renamed to prevent
> > > overloading,
> > > > > thus usable with PartialFunction instead of functions:
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > > >
> > > > > In this other commit I coded the original proposal, renaming the
> > > methods
> > > > to
> > > > > obtain the same effect as before, but with lower friction for Scala
> > > > > developers (and provided some usage examples):
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > > >
> > > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > > stefano.baghino@radicalbit.io> wrote:
> > > > >
> > > > > > Hi Stephan,
> > > > > >
> > > > > > thank you for the quick reply and for your feedback; I agree with
> > you
> > > > > that
> > > > > > breaking changes have to taken very seriously.
> > > > > >
> > > > > > The rationale behind my proposal is that Scala users are already
> > > > > > accustomed to higher-order functions that manipulate collections
> > and
> > > it
> > > > > > would beneficial for them to have an API that tries to adhere as
> > much
> > > > as
> > > > > > possible to the interface provided by the Scala Collections API.
> > IMHO
> > > > > being
> > > > > > able to manipulate a DataSet or DataStream like a Scala
> collection
> > > > > > idiomatically would appeal to developers and reduce the friction
> > for
> > > > them
> > > > > > to learn Flink.
> > > > > >
> > > > > > If we want to pursue the renaming path, I think these changes
> (and
> > > > > porting
> > > > > > the rest of the codebase, like `flink-ml` and `flink-contrib`, to
> > the
> > > > new
> > > > > > method names) can be done in relatively little time. Since Flink
> is
> > > > > > approaching a major release, I think it's a good time to consider
> > > this
> > > > > > change, if the community deems it relevant.
> > > > > >
> > > > > > While we await for feedback on the proposal, I can start working
> on
> > > > both
> > > > > > paths to see how it would affect the codebase, what do you think?
> > > > > >
> > > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <se...@apache.org>
> > > > wrote:
> > > > > >
> > > > > >> Hi!
> > > > > >>
> > > > > >> Would be nice to support that, agreed.
> > > > > >>
> > > > > >> Such a fundamental break in the API worries me a bit, though - I
> > > would
> > > > > opt
> > > > > >> for a non-breaking addition.
> > > > > >> Wrapping the RichFunctions into Scala functions (which are
> > actually
> > > > > >> wrapped
> > > > > >> as rich functions) with implicits seems like a workaround for
> > > > something
> > > > > >> that should be very simple. Would probably also cost a bit of
> > > > > performance.
> > > > > >>
> > > > > >>
> > > > > >> I like the idea of "mapWith(...)" - if that were a simple non
> > > > overloaded
> > > > > >> function accepting a Scala function, it should accept case-style
> > > > > >> functions,
> > > > > >> right?
> > > > > >> Simply adding that would probably solve things, but add a second
> > > > variant
> > > > > >> of
> > > > > >> each function to the DataSet. An implicit conversion from
> DataSet
> > to
> > > > > >> DataSetExtended (which implements the mapWith, reduceWith, ...)
> > > > methods
> > > > > >> could help there...
> > > > > >>
> > > > > >> What do you think?
> > > > > >>
> > > > > >> Greetings,
> > > > > >> Stephan
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > > >>
> > > > > >> > Hello everybody,
> > > > > >> >
> > > > > >> > as I'm getting familiar with Flink I've found a possible
> > > improvement
> > > > > to
> > > > > >> the
> > > > > >> > Scala APIs: in Scala it's a common pattern to perform tuple
> > > > extraction
> > > > > >> > using pattern matching, making functions working on tuples
> more
> > > > > >> readable,
> > > > > >> > like this:
> > > > > >> >
> > > > > >> > // referring to the mail count example in the training
> > > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > > >> > // a pair of date and a string with username and email
> > > > > >> > val monthsAndEmails =
> > > > > >> >   mails.map {
> > > > > >> >     case (date, sender) =>
> > > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > > >> >   }
> > > > > >> >
> > > > > >> > However, this is not possible when using the Scala APIs
> because
> > of
> > > > the
> > > > > >> > overloading of the `map` function in the `DataSet` and
> > > `DataStream`
> > > > > >> classes
> > > > > >> > (along with other higher-order function such as `flatMap` and
> > > > > >> `filter`). My
> > > > > >> > understanding is that the main reason to have two different
> > > > overloaded
> > > > > >> > functions is to provide support for `RichFunction`s.
> > > > > >> > I've found out there has been some interest around the issue
> in
> > > the
> > > > > >> past (
> > > > > >> > [FLINK-1159] <
> https://issues.apache.org/jira/browse/FLINK-1159
> > >).
> > > > > >> > In the past couple of days me and my colleague Andrea have
> tried
> > > > > several
> > > > > >> > ways to address the problem, coming to two possible solutions:
> > > > > >> >
> > > > > >> >    1. don't overload and use different names, e.g. `map`
> taking
> > a
> > > > > Scala
> > > > > >> >    function and `mapWith` taking a Flink MapFunction
> > > > > >> >    2. keep only the method taking a Scala function (which
> would
> > be
> > > > > more
> > > > > >> >    idiomatic from a Scala perspective, IMHO) and providing an
> > > > implicit
> > > > > >> >    conversion from the Flink function to the Scala function
> > within
> > > > the
> > > > > >> >    `org.apache.flink.api.scala` package object
> > > > > >> >
> > > > > >> > We've also evaluated several other approaches using union
> types
> > > and
> > > > > type
> > > > > >> > classes but we've found them to be too complex. Regarding the
> > two
> > > > > >> > approaches I've cited, the first would imply a breaking change
> > to
> > > > the
> > > > > >> APIs,
> > > > > >> > while the second is giving me a hard time at figuring out some
> > > > > >> compilation
> > > > > >> > errors in `flink-libraries` and `flink-contrib` and as we
> tested
> > > it
> > > > we
> > > > > >> > found out `RichMapFunction`s lose state (possibly because of
> the
> > > > > double
> > > > > >> > conversion, first to a Scala function, then to a simple
> > > > > `MapFunction`).
> > > > > >> >
> > > > > >> > You can have a look at the code I've written so far here
> (last 2
> > > > > >> commits):
> > > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > > >> >
> > > > > >> > We had a little exchange of ideas and thought that the first
> > > > solution
> > > > > >> would
> > > > > >> > be easier and also interesting from the standpoint of the
> > > ergonomics
> > > > > of
> > > > > >> the
> > > > > >> > API (e.g. `line mapWith new LineSplitter`) and would like to
> > > gather
> > > > > some
> > > > > >> > feedback on the feasibility of this change.
> > > > > >> >
> > > > > >> > Would this still be regarded as a relevant improvement? What
> do
> > > you
> > > > > >> think
> > > > > >> > about it? Do you think there's time to work on them before the
> > 1.0
> > > > > >> release?
> > > > > >> > What do you think about introducing breaking changes to make
> > this
> > > > > >> pattern
> > > > > >> > available to Scala users?
> > > > > >> >
> > > > > >> > Thank you all in advance for your feedback.
> > > > > >> >
> > > > > >> > --
> > > > > >> > BR,
> > > > > >> > Stefano Baghino
> > > > > >> >
> > > > > >> > Software Engineer @ Radicalbit
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > BR,
> > > > > > Stefano Baghino
> > > > > >
> > > > > > Software Engineer @ Radicalbit
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > BR,
> > > > > Stefano Baghino
> > > > >
> > > > > Software Engineer @ Radicalbit
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > BR,
> > > Stefano Baghino
> > >
> > > Software Engineer @ Radicalbit
> > >
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Re: Case style anonymous functions not supported by Scala API

Posted by Stefano Baghino <st...@radicalbit.io>.
Sure, it was just a draft. I agree that filter and mapPartition make sense,
but coGroup and join don't look like they take a function.

On Tue, Feb 9, 2016 at 10:08 AM, Till Rohrmann <tr...@apache.org> wrote:

> This looks like a good design to me :-) The only thing is that it is not
> complete. For example, the filter, mapPartition, coGroup and join functions
> are missing.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
> > What do you think of something like this?
> >
> >
> >
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
> >
> > In this way, several extensions can be collected in this package object
> and
> > picked altogether or a-là-carte (e.g. import
> > org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
> >
> > On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > I like the idea to support partial functions with Flink’s Scala API.
> > > However, I think that breaking the API and making it inconsistent with
> > > respect to the Java API is not the best option. I would rather be in
> > favour
> > > of the first proposal where we add a new method xxxWith via implicit
> > > conversions.
> > >
> > > Cheers,
> > > Till
> > > ​
> > >
> > > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > > stefano.baghino@radicalbit.io> wrote:
> > >
> > > > It took me a little time but I was able to put together some code.
> > > >
> > > > In this commit I just added a few methods renamed to prevent
> > overloading,
> > > > thus usable with PartialFunction instead of functions:
> > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > > >
> > > > In this other commit I coded the original proposal, renaming the
> > methods
> > > to
> > > > obtain the same effect as before, but with lower friction for Scala
> > > > developers (and provided some usage examples):
> > > >
> > > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > > >
> > > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > > stefano.baghino@radicalbit.io> wrote:
> > > >
> > > > > Hi Stephan,
> > > > >
> > > > > thank you for the quick reply and for your feedback; I agree with
> you
> > > > that
> > > > > breaking changes have to taken very seriously.
> > > > >
> > > > > The rationale behind my proposal is that Scala users are already
> > > > > accustomed to higher-order functions that manipulate collections
> and
> > it
> > > > > would beneficial for them to have an API that tries to adhere as
> much
> > > as
> > > > > possible to the interface provided by the Scala Collections API.
> IMHO
> > > > being
> > > > > able to manipulate a DataSet or DataStream like a Scala collection
> > > > > idiomatically would appeal to developers and reduce the friction
> for
> > > them
> > > > > to learn Flink.
> > > > >
> > > > > If we want to pursue the renaming path, I think these changes (and
> > > > porting
> > > > > the rest of the codebase, like `flink-ml` and `flink-contrib`, to
> the
> > > new
> > > > > method names) can be done in relatively little time. Since Flink is
> > > > > approaching a major release, I think it's a good time to consider
> > this
> > > > > change, if the community deems it relevant.
> > > > >
> > > > > While we await for feedback on the proposal, I can start working on
> > > both
> > > > > paths to see how it would affect the codebase, what do you think?
> > > > >
> > > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > >
> > > > >> Hi!
> > > > >>
> > > > >> Would be nice to support that, agreed.
> > > > >>
> > > > >> Such a fundamental break in the API worries me a bit, though - I
> > would
> > > > opt
> > > > >> for a non-breaking addition.
> > > > >> Wrapping the RichFunctions into Scala functions (which are
> actually
> > > > >> wrapped
> > > > >> as rich functions) with implicits seems like a workaround for
> > > something
> > > > >> that should be very simple. Would probably also cost a bit of
> > > > performance.
> > > > >>
> > > > >>
> > > > >> I like the idea of "mapWith(...)" - if that were a simple non
> > > overloaded
> > > > >> function accepting a Scala function, it should accept case-style
> > > > >> functions,
> > > > >> right?
> > > > >> Simply adding that would probably solve things, but add a second
> > > variant
> > > > >> of
> > > > >> each function to the DataSet. An implicit conversion from DataSet
> to
> > > > >> DataSetExtended (which implements the mapWith, reduceWith, ...)
> > > methods
> > > > >> could help there...
> > > > >>
> > > > >> What do you think?
> > > > >>
> > > > >> Greetings,
> > > > >> Stephan
> > > > >>
> > > > >>
> > > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > > >> stefano.baghino@radicalbit.io> wrote:
> > > > >>
> > > > >> > Hello everybody,
> > > > >> >
> > > > >> > as I'm getting familiar with Flink I've found a possible
> > improvement
> > > > to
> > > > >> the
> > > > >> > Scala APIs: in Scala it's a common pattern to perform tuple
> > > extraction
> > > > >> > using pattern matching, making functions working on tuples more
> > > > >> readable,
> > > > >> > like this:
> > > > >> >
> > > > >> > // referring to the mail count example in the training
> > > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > > >> > // a pair of date and a string with username and email
> > > > >> > val monthsAndEmails =
> > > > >> >   mails.map {
> > > > >> >     case (date, sender) =>
> > > > >> >       (extractMonth(date), extractEmail(sender))
> > > > >> >   }
> > > > >> >
> > > > >> > However, this is not possible when using the Scala APIs because
> of
> > > the
> > > > >> > overloading of the `map` function in the `DataSet` and
> > `DataStream`
> > > > >> classes
> > > > >> > (along with other higher-order function such as `flatMap` and
> > > > >> `filter`). My
> > > > >> > understanding is that the main reason to have two different
> > > overloaded
> > > > >> > functions is to provide support for `RichFunction`s.
> > > > >> > I've found out there has been some interest around the issue in
> > the
> > > > >> past (
> > > > >> > [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159
> >).
> > > > >> > In the past couple of days me and my colleague Andrea have tried
> > > > several
> > > > >> > ways to address the problem, coming to two possible solutions:
> > > > >> >
> > > > >> >    1. don't overload and use different names, e.g. `map` taking
> a
> > > > Scala
> > > > >> >    function and `mapWith` taking a Flink MapFunction
> > > > >> >    2. keep only the method taking a Scala function (which would
> be
> > > > more
> > > > >> >    idiomatic from a Scala perspective, IMHO) and providing an
> > > implicit
> > > > >> >    conversion from the Flink function to the Scala function
> within
> > > the
> > > > >> >    `org.apache.flink.api.scala` package object
> > > > >> >
> > > > >> > We've also evaluated several other approaches using union types
> > and
> > > > type
> > > > >> > classes but we've found them to be too complex. Regarding the
> two
> > > > >> > approaches I've cited, the first would imply a breaking change
> to
> > > the
> > > > >> APIs,
> > > > >> > while the second is giving me a hard time at figuring out some
> > > > >> compilation
> > > > >> > errors in `flink-libraries` and `flink-contrib` and as we tested
> > it
> > > we
> > > > >> > found out `RichMapFunction`s lose state (possibly because of the
> > > > double
> > > > >> > conversion, first to a Scala function, then to a simple
> > > > `MapFunction`).
> > > > >> >
> > > > >> > You can have a look at the code I've written so far here (last 2
> > > > >> commits):
> > > > >> > https://github.com/radicalbit/flink/commits/1159
> > > > >> >
> > > > >> > We had a little exchange of ideas and thought that the first
> > > solution
> > > > >> would
> > > > >> > be easier and also interesting from the standpoint of the
> > ergonomics
> > > > of
> > > > >> the
> > > > >> > API (e.g. `line mapWith new LineSplitter`) and would like to
> > gather
> > > > some
> > > > >> > feedback on the feasibility of this change.
> > > > >> >
> > > > >> > Would this still be regarded as a relevant improvement? What do
> > you
> > > > >> think
> > > > >> > about it? Do you think there's time to work on them before the
> 1.0
> > > > >> release?
> > > > >> > What do you think about introducing breaking changes to make
> this
> > > > >> pattern
> > > > >> > available to Scala users?
> > > > >> >
> > > > >> > Thank you all in advance for your feedback.
> > > > >> >
> > > > >> > --
> > > > >> > BR,
> > > > >> > Stefano Baghino
> > > > >> >
> > > > >> > Software Engineer @ Radicalbit
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > BR,
> > > > > Stefano Baghino
> > > > >
> > > > > Software Engineer @ Radicalbit
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > BR,
> > > > Stefano Baghino
> > > >
> > > > Software Engineer @ Radicalbit
> > > >
> > >
> >
> >
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
> >
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Till Rohrmann <tr...@apache.org>.
This looks like a good design to me :-) The only thing is that it is not
complete. For example, the filter, mapPartition, coGroup and join functions
are missing.

Cheers,
Till
​

On Tue, Feb 9, 2016 at 1:18 AM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> What do you think of something like this?
>
>
> https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e
>
> In this way, several extensions can be collected in this package object and
> picked altogether or a-là-carte (e.g. import
> org.apache.flink.api.scala.extensions.AcceptPartialFunctions).
>
> On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> > I like the idea to support partial functions with Flink’s Scala API.
> > However, I think that breaking the API and making it inconsistent with
> > respect to the Java API is not the best option. I would rather be in
> favour
> > of the first proposal where we add a new method xxxWith via implicit
> > conversions.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> > stefano.baghino@radicalbit.io> wrote:
> >
> > > It took me a little time but I was able to put together some code.
> > >
> > > In this commit I just added a few methods renamed to prevent
> overloading,
> > > thus usable with PartialFunction instead of functions:
> > >
> > >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> > >
> > > In this other commit I coded the original proposal, renaming the
> methods
> > to
> > > obtain the same effect as before, but with lower friction for Scala
> > > developers (and provided some usage examples):
> > >
> > >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> > >
> > > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > > stefano.baghino@radicalbit.io> wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > thank you for the quick reply and for your feedback; I agree with you
> > > that
> > > > breaking changes have to taken very seriously.
> > > >
> > > > The rationale behind my proposal is that Scala users are already
> > > > accustomed to higher-order functions that manipulate collections and
> it
> > > > would beneficial for them to have an API that tries to adhere as much
> > as
> > > > possible to the interface provided by the Scala Collections API. IMHO
> > > being
> > > > able to manipulate a DataSet or DataStream like a Scala collection
> > > > idiomatically would appeal to developers and reduce the friction for
> > them
> > > > to learn Flink.
> > > >
> > > > If we want to pursue the renaming path, I think these changes (and
> > > porting
> > > > the rest of the codebase, like `flink-ml` and `flink-contrib`, to the
> > new
> > > > method names) can be done in relatively little time. Since Flink is
> > > > approaching a major release, I think it's a good time to consider
> this
> > > > change, if the community deems it relevant.
> > > >
> > > > While we await for feedback on the proposal, I can start working on
> > both
> > > > paths to see how it would affect the codebase, what do you think?
> > > >
> > > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > >
> > > >> Hi!
> > > >>
> > > >> Would be nice to support that, agreed.
> > > >>
> > > >> Such a fundamental break in the API worries me a bit, though - I
> would
> > > opt
> > > >> for a non-breaking addition.
> > > >> Wrapping the RichFunctions into Scala functions (which are actually
> > > >> wrapped
> > > >> as rich functions) with implicits seems like a workaround for
> > something
> > > >> that should be very simple. Would probably also cost a bit of
> > > performance.
> > > >>
> > > >>
> > > >> I like the idea of "mapWith(...)" - if that were a simple non
> > overloaded
> > > >> function accepting a Scala function, it should accept case-style
> > > >> functions,
> > > >> right?
> > > >> Simply adding that would probably solve things, but add a second
> > variant
> > > >> of
> > > >> each function to the DataSet. An implicit conversion from DataSet to
> > > >> DataSetExtended (which implements the mapWith, reduceWith, ...)
> > methods
> > > >> could help there...
> > > >>
> > > >> What do you think?
> > > >>
> > > >> Greetings,
> > > >> Stephan
> > > >>
> > > >>
> > > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > > >> stefano.baghino@radicalbit.io> wrote:
> > > >>
> > > >> > Hello everybody,
> > > >> >
> > > >> > as I'm getting familiar with Flink I've found a possible
> improvement
> > > to
> > > >> the
> > > >> > Scala APIs: in Scala it's a common pattern to perform tuple
> > extraction
> > > >> > using pattern matching, making functions working on tuples more
> > > >> readable,
> > > >> > like this:
> > > >> >
> > > >> > // referring to the mail count example in the training
> > > >> > // assuming `mails` is a DataSet[(String, String)]
> > > >> > // a pair of date and a string with username and email
> > > >> > val monthsAndEmails =
> > > >> >   mails.map {
> > > >> >     case (date, sender) =>
> > > >> >       (extractMonth(date), extractEmail(sender))
> > > >> >   }
> > > >> >
> > > >> > However, this is not possible when using the Scala APIs because of
> > the
> > > >> > overloading of the `map` function in the `DataSet` and
> `DataStream`
> > > >> classes
> > > >> > (along with other higher-order function such as `flatMap` and
> > > >> `filter`). My
> > > >> > understanding is that the main reason to have two different
> > overloaded
> > > >> > functions is to provide support for `RichFunction`s.
> > > >> > I've found out there has been some interest around the issue in
> the
> > > >> past (
> > > >> > [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
> > > >> > In the past couple of days me and my colleague Andrea have tried
> > > several
> > > >> > ways to address the problem, coming to two possible solutions:
> > > >> >
> > > >> >    1. don't overload and use different names, e.g. `map` taking a
> > > Scala
> > > >> >    function and `mapWith` taking a Flink MapFunction
> > > >> >    2. keep only the method taking a Scala function (which would be
> > > more
> > > >> >    idiomatic from a Scala perspective, IMHO) and providing an
> > implicit
> > > >> >    conversion from the Flink function to the Scala function within
> > the
> > > >> >    `org.apache.flink.api.scala` package object
> > > >> >
> > > >> > We've also evaluated several other approaches using union types
> and
> > > type
> > > >> > classes but we've found them to be too complex. Regarding the two
> > > >> > approaches I've cited, the first would imply a breaking change to
> > the
> > > >> APIs,
> > > >> > while the second is giving me a hard time at figuring out some
> > > >> compilation
> > > >> > errors in `flink-libraries` and `flink-contrib` and as we tested
> it
> > we
> > > >> > found out `RichMapFunction`s lose state (possibly because of the
> > > double
> > > >> > conversion, first to a Scala function, then to a simple
> > > `MapFunction`).
> > > >> >
> > > >> > You can have a look at the code I've written so far here (last 2
> > > >> commits):
> > > >> > https://github.com/radicalbit/flink/commits/1159
> > > >> >
> > > >> > We had a little exchange of ideas and thought that the first
> > solution
> > > >> would
> > > >> > be easier and also interesting from the standpoint of the
> ergonomics
> > > of
> > > >> the
> > > >> > API (e.g. `line mapWith new LineSplitter`) and would like to
> gather
> > > some
> > > >> > feedback on the feasibility of this change.
> > > >> >
> > > >> > Would this still be regarded as a relevant improvement? What do
> you
> > > >> think
> > > >> > about it? Do you think there's time to work on them before the 1.0
> > > >> release?
> > > >> > What do you think about introducing breaking changes to make this
> > > >> pattern
> > > >> > available to Scala users?
> > > >> >
> > > >> > Thank you all in advance for your feedback.
> > > >> >
> > > >> > --
> > > >> > BR,
> > > >> > Stefano Baghino
> > > >> >
> > > >> > Software Engineer @ Radicalbit
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > BR,
> > > > Stefano Baghino
> > > >
> > > > Software Engineer @ Radicalbit
> > > >
> > >
> > >
> > >
> > > --
> > > BR,
> > > Stefano Baghino
> > >
> > > Software Engineer @ Radicalbit
> > >
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Re: Case style anonymous functions not supported by Scala API

Posted by Stefano Baghino <st...@radicalbit.io>.
What do you think of something like this?

https://github.com/radicalbit/flink/commit/21a889a437875c88921c93e87d88a378c6b4299e

In this way, several extensions can be collected in this package object and
picked altogether or a-là-carte (e.g. import
org.apache.flink.api.scala.extensions.AcceptPartialFunctions).

On Mon, Feb 8, 2016 at 2:51 PM, Till Rohrmann <tr...@apache.org> wrote:

> I like the idea to support partial functions with Flink’s Scala API.
> However, I think that breaking the API and making it inconsistent with
> respect to the Java API is not the best option. I would rather be in favour
> of the first proposal where we add a new method xxxWith via implicit
> conversions.
>
> Cheers,
> Till
> ​
>
> On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
> > It took me a little time but I was able to put together some code.
> >
> > In this commit I just added a few methods renamed to prevent overloading,
> > thus usable with PartialFunction instead of functions:
> >
> >
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
> >
> > In this other commit I coded the original proposal, renaming the methods
> to
> > obtain the same effect as before, but with lower friction for Scala
> > developers (and provided some usage examples):
> >
> >
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
> >
> > On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> > stefano.baghino@radicalbit.io> wrote:
> >
> > > Hi Stephan,
> > >
> > > thank you for the quick reply and for your feedback; I agree with you
> > that
> > > breaking changes have to taken very seriously.
> > >
> > > The rationale behind my proposal is that Scala users are already
> > > accustomed to higher-order functions that manipulate collections and it
> > > would beneficial for them to have an API that tries to adhere as much
> as
> > > possible to the interface provided by the Scala Collections API. IMHO
> > being
> > > able to manipulate a DataSet or DataStream like a Scala collection
> > > idiomatically would appeal to developers and reduce the friction for
> them
> > > to learn Flink.
> > >
> > > If we want to pursue the renaming path, I think these changes (and
> > porting
> > > the rest of the codebase, like `flink-ml` and `flink-contrib`, to the
> new
> > > method names) can be done in relatively little time. Since Flink is
> > > approaching a major release, I think it's a good time to consider this
> > > change, if the community deems it relevant.
> > >
> > > While we await for feedback on the proposal, I can start working on
> both
> > > paths to see how it would affect the codebase, what do you think?
> > >
> > > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > >> Hi!
> > >>
> > >> Would be nice to support that, agreed.
> > >>
> > >> Such a fundamental break in the API worries me a bit, though - I would
> > opt
> > >> for a non-breaking addition.
> > >> Wrapping the RichFunctions into Scala functions (which are actually
> > >> wrapped
> > >> as rich functions) with implicits seems like a workaround for
> something
> > >> that should be very simple. Would probably also cost a bit of
> > performance.
> > >>
> > >>
> > >> I like the idea of "mapWith(...)" - if that were a simple non
> overloaded
> > >> function accepting a Scala function, it should accept case-style
> > >> functions,
> > >> right?
> > >> Simply adding that would probably solve things, but add a second
> variant
> > >> of
> > >> each function to the DataSet. An implicit conversion from DataSet to
> > >> DataSetExtended (which implements the mapWith, reduceWith, ...)
> methods
> > >> could help there...
> > >>
> > >> What do you think?
> > >>
> > >> Greetings,
> > >> Stephan
> > >>
> > >>
> > >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> > >> stefano.baghino@radicalbit.io> wrote:
> > >>
> > >> > Hello everybody,
> > >> >
> > >> > as I'm getting familiar with Flink I've found a possible improvement
> > to
> > >> the
> > >> > Scala APIs: in Scala it's a common pattern to perform tuple
> extraction
> > >> > using pattern matching, making functions working on tuples more
> > >> readable,
> > >> > like this:
> > >> >
> > >> > // referring to the mail count example in the training
> > >> > // assuming `mails` is a DataSet[(String, String)]
> > >> > // a pair of date and a string with username and email
> > >> > val monthsAndEmails =
> > >> >   mails.map {
> > >> >     case (date, sender) =>
> > >> >       (extractMonth(date), extractEmail(sender))
> > >> >   }
> > >> >
> > >> > However, this is not possible when using the Scala APIs because of
> the
> > >> > overloading of the `map` function in the `DataSet` and `DataStream`
> > >> classes
> > >> > (along with other higher-order function such as `flatMap` and
> > >> `filter`). My
> > >> > understanding is that the main reason to have two different
> overloaded
> > >> > functions is to provide support for `RichFunction`s.
> > >> > I've found out there has been some interest around the issue in the
> > >> past (
> > >> > [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
> > >> > In the past couple of days me and my colleague Andrea have tried
> > several
> > >> > ways to address the problem, coming to two possible solutions:
> > >> >
> > >> >    1. don't overload and use different names, e.g. `map` taking a
> > Scala
> > >> >    function and `mapWith` taking a Flink MapFunction
> > >> >    2. keep only the method taking a Scala function (which would be
> > more
> > >> >    idiomatic from a Scala perspective, IMHO) and providing an
> implicit
> > >> >    conversion from the Flink function to the Scala function within
> the
> > >> >    `org.apache.flink.api.scala` package object
> > >> >
> > >> > We've also evaluated several other approaches using union types and
> > type
> > >> > classes but we've found them to be too complex. Regarding the two
> > >> > approaches I've cited, the first would imply a breaking change to
> the
> > >> APIs,
> > >> > while the second is giving me a hard time at figuring out some
> > >> compilation
> > >> > errors in `flink-libraries` and `flink-contrib` and as we tested it
> we
> > >> > found out `RichMapFunction`s lose state (possibly because of the
> > double
> > >> > conversion, first to a Scala function, then to a simple
> > `MapFunction`).
> > >> >
> > >> > You can have a look at the code I've written so far here (last 2
> > >> commits):
> > >> > https://github.com/radicalbit/flink/commits/1159
> > >> >
> > >> > We had a little exchange of ideas and thought that the first
> solution
> > >> would
> > >> > be easier and also interesting from the standpoint of the ergonomics
> > of
> > >> the
> > >> > API (e.g. `line mapWith new LineSplitter`) and would like to gather
> > some
> > >> > feedback on the feasibility of this change.
> > >> >
> > >> > Would this still be regarded as a relevant improvement? What do you
> > >> think
> > >> > about it? Do you think there's time to work on them before the 1.0
> > >> release?
> > >> > What do you think about introducing breaking changes to make this
> > >> pattern
> > >> > available to Scala users?
> > >> >
> > >> > Thank you all in advance for your feedback.
> > >> >
> > >> > --
> > >> > BR,
> > >> > Stefano Baghino
> > >> >
> > >> > Software Engineer @ Radicalbit
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > BR,
> > > Stefano Baghino
> > >
> > > Software Engineer @ Radicalbit
> > >
> >
> >
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
> >
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Till Rohrmann <tr...@apache.org>.
I like the idea to support partial functions with Flink’s Scala API.
However, I think that breaking the API and making it inconsistent with
respect to the Java API is not the best option. I would rather be in favour
of the first proposal where we add a new method xxxWith via implicit
conversions.

Cheers,
Till
​

On Sun, Feb 7, 2016 at 12:44 PM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> It took me a little time but I was able to put together some code.
>
> In this commit I just added a few methods renamed to prevent overloading,
> thus usable with PartialFunction instead of functions:
>
> https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748
>
> In this other commit I coded the original proposal, renaming the methods to
> obtain the same effect as before, but with lower friction for Scala
> developers (and provided some usage examples):
>
> https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5
>
> On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
> > Hi Stephan,
> >
> > thank you for the quick reply and for your feedback; I agree with you
> that
> > breaking changes have to taken very seriously.
> >
> > The rationale behind my proposal is that Scala users are already
> > accustomed to higher-order functions that manipulate collections and it
> > would beneficial for them to have an API that tries to adhere as much as
> > possible to the interface provided by the Scala Collections API. IMHO
> being
> > able to manipulate a DataSet or DataStream like a Scala collection
> > idiomatically would appeal to developers and reduce the friction for them
> > to learn Flink.
> >
> > If we want to pursue the renaming path, I think these changes (and
> porting
> > the rest of the codebase, like `flink-ml` and `flink-contrib`, to the new
> > method names) can be done in relatively little time. Since Flink is
> > approaching a major release, I think it's a good time to consider this
> > change, if the community deems it relevant.
> >
> > While we await for feedback on the proposal, I can start working on both
> > paths to see how it would affect the codebase, what do you think?
> >
> > On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> >> Hi!
> >>
> >> Would be nice to support that, agreed.
> >>
> >> Such a fundamental break in the API worries me a bit, though - I would
> opt
> >> for a non-breaking addition.
> >> Wrapping the RichFunctions into Scala functions (which are actually
> >> wrapped
> >> as rich functions) with implicits seems like a workaround for something
> >> that should be very simple. Would probably also cost a bit of
> performance.
> >>
> >>
> >> I like the idea of "mapWith(...)" - if that were a simple non overloaded
> >> function accepting a Scala function, it should accept case-style
> >> functions,
> >> right?
> >> Simply adding that would probably solve things, but add a second variant
> >> of
> >> each function to the DataSet. An implicit conversion from DataSet to
> >> DataSetExtended (which implements the mapWith, reduceWith, ...) methods
> >> could help there...
> >>
> >> What do you think?
> >>
> >> Greetings,
> >> Stephan
> >>
> >>
> >> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> >> stefano.baghino@radicalbit.io> wrote:
> >>
> >> > Hello everybody,
> >> >
> >> > as I'm getting familiar with Flink I've found a possible improvement
> to
> >> the
> >> > Scala APIs: in Scala it's a common pattern to perform tuple extraction
> >> > using pattern matching, making functions working on tuples more
> >> readable,
> >> > like this:
> >> >
> >> > // referring to the mail count example in the training
> >> > // assuming `mails` is a DataSet[(String, String)]
> >> > // a pair of date and a string with username and email
> >> > val monthsAndEmails =
> >> >   mails.map {
> >> >     case (date, sender) =>
> >> >       (extractMonth(date), extractEmail(sender))
> >> >   }
> >> >
> >> > However, this is not possible when using the Scala APIs because of the
> >> > overloading of the `map` function in the `DataSet` and `DataStream`
> >> classes
> >> > (along with other higher-order function such as `flatMap` and
> >> `filter`). My
> >> > understanding is that the main reason to have two different overloaded
> >> > functions is to provide support for `RichFunction`s.
> >> > I've found out there has been some interest around the issue in the
> >> past (
> >> > [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
> >> > In the past couple of days me and my colleague Andrea have tried
> several
> >> > ways to address the problem, coming to two possible solutions:
> >> >
> >> >    1. don't overload and use different names, e.g. `map` taking a
> Scala
> >> >    function and `mapWith` taking a Flink MapFunction
> >> >    2. keep only the method taking a Scala function (which would be
> more
> >> >    idiomatic from a Scala perspective, IMHO) and providing an implicit
> >> >    conversion from the Flink function to the Scala function within the
> >> >    `org.apache.flink.api.scala` package object
> >> >
> >> > We've also evaluated several other approaches using union types and
> type
> >> > classes but we've found them to be too complex. Regarding the two
> >> > approaches I've cited, the first would imply a breaking change to the
> >> APIs,
> >> > while the second is giving me a hard time at figuring out some
> >> compilation
> >> > errors in `flink-libraries` and `flink-contrib` and as we tested it we
> >> > found out `RichMapFunction`s lose state (possibly because of the
> double
> >> > conversion, first to a Scala function, then to a simple
> `MapFunction`).
> >> >
> >> > You can have a look at the code I've written so far here (last 2
> >> commits):
> >> > https://github.com/radicalbit/flink/commits/1159
> >> >
> >> > We had a little exchange of ideas and thought that the first solution
> >> would
> >> > be easier and also interesting from the standpoint of the ergonomics
> of
> >> the
> >> > API (e.g. `line mapWith new LineSplitter`) and would like to gather
> some
> >> > feedback on the feasibility of this change.
> >> >
> >> > Would this still be regarded as a relevant improvement? What do you
> >> think
> >> > about it? Do you think there's time to work on them before the 1.0
> >> release?
> >> > What do you think about introducing breaking changes to make this
> >> pattern
> >> > available to Scala users?
> >> >
> >> > Thank you all in advance for your feedback.
> >> >
> >> > --
> >> > BR,
> >> > Stefano Baghino
> >> >
> >> > Software Engineer @ Radicalbit
> >> >
> >>
> >
> >
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
> >
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Re: Case style anonymous functions not supported by Scala API

Posted by Stefano Baghino <st...@radicalbit.io>.
It took me a little time but I was able to put together some code.

In this commit I just added a few methods renamed to prevent overloading,
thus usable with PartialFunction instead of functions:
https://github.com/radicalbit/flink/commit/aacd59e0ce98cccb66d48a30d07990ac8f345748

In this other commit I coded the original proposal, renaming the methods to
obtain the same effect as before, but with lower friction for Scala
developers (and provided some usage examples):
https://github.com/radicalbit/flink/commit/33403878eebba70def42f73a1cb671d13b1521b5

On Thu, Jan 28, 2016 at 6:16 PM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> Hi Stephan,
>
> thank you for the quick reply and for your feedback; I agree with you that
> breaking changes have to taken very seriously.
>
> The rationale behind my proposal is that Scala users are already
> accustomed to higher-order functions that manipulate collections and it
> would beneficial for them to have an API that tries to adhere as much as
> possible to the interface provided by the Scala Collections API. IMHO being
> able to manipulate a DataSet or DataStream like a Scala collection
> idiomatically would appeal to developers and reduce the friction for them
> to learn Flink.
>
> If we want to pursue the renaming path, I think these changes (and porting
> the rest of the codebase, like `flink-ml` and `flink-contrib`, to the new
> method names) can be done in relatively little time. Since Flink is
> approaching a major release, I think it's a good time to consider this
> change, if the community deems it relevant.
>
> While we await for feedback on the proposal, I can start working on both
> paths to see how it would affect the codebase, what do you think?
>
> On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Would be nice to support that, agreed.
>>
>> Such a fundamental break in the API worries me a bit, though - I would opt
>> for a non-breaking addition.
>> Wrapping the RichFunctions into Scala functions (which are actually
>> wrapped
>> as rich functions) with implicits seems like a workaround for something
>> that should be very simple. Would probably also cost a bit of performance.
>>
>>
>> I like the idea of "mapWith(...)" - if that were a simple non overloaded
>> function accepting a Scala function, it should accept case-style
>> functions,
>> right?
>> Simply adding that would probably solve things, but add a second variant
>> of
>> each function to the DataSet. An implicit conversion from DataSet to
>> DataSetExtended (which implements the mapWith, reduceWith, ...) methods
>> could help there...
>>
>> What do you think?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
>> stefano.baghino@radicalbit.io> wrote:
>>
>> > Hello everybody,
>> >
>> > as I'm getting familiar with Flink I've found a possible improvement to
>> the
>> > Scala APIs: in Scala it's a common pattern to perform tuple extraction
>> > using pattern matching, making functions working on tuples more
>> readable,
>> > like this:
>> >
>> > // referring to the mail count example in the training
>> > // assuming `mails` is a DataSet[(String, String)]
>> > // a pair of date and a string with username and email
>> > val monthsAndEmails =
>> >   mails.map {
>> >     case (date, sender) =>
>> >       (extractMonth(date), extractEmail(sender))
>> >   }
>> >
>> > However, this is not possible when using the Scala APIs because of the
>> > overloading of the `map` function in the `DataSet` and `DataStream`
>> classes
>> > (along with other higher-order function such as `flatMap` and
>> `filter`). My
>> > understanding is that the main reason to have two different overloaded
>> > functions is to provide support for `RichFunction`s.
>> > I've found out there has been some interest around the issue in the
>> past (
>> > [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
>> > In the past couple of days me and my colleague Andrea have tried several
>> > ways to address the problem, coming to two possible solutions:
>> >
>> >    1. don't overload and use different names, e.g. `map` taking a Scala
>> >    function and `mapWith` taking a Flink MapFunction
>> >    2. keep only the method taking a Scala function (which would be more
>> >    idiomatic from a Scala perspective, IMHO) and providing an implicit
>> >    conversion from the Flink function to the Scala function within the
>> >    `org.apache.flink.api.scala` package object
>> >
>> > We've also evaluated several other approaches using union types and type
>> > classes but we've found them to be too complex. Regarding the two
>> > approaches I've cited, the first would imply a breaking change to the
>> APIs,
>> > while the second is giving me a hard time at figuring out some
>> compilation
>> > errors in `flink-libraries` and `flink-contrib` and as we tested it we
>> > found out `RichMapFunction`s lose state (possibly because of the double
>> > conversion, first to a Scala function, then to a simple `MapFunction`).
>> >
>> > You can have a look at the code I've written so far here (last 2
>> commits):
>> > https://github.com/radicalbit/flink/commits/1159
>> >
>> > We had a little exchange of ideas and thought that the first solution
>> would
>> > be easier and also interesting from the standpoint of the ergonomics of
>> the
>> > API (e.g. `line mapWith new LineSplitter`) and would like to gather some
>> > feedback on the feasibility of this change.
>> >
>> > Would this still be regarded as a relevant improvement? What do you
>> think
>> > about it? Do you think there's time to work on them before the 1.0
>> release?
>> > What do you think about introducing breaking changes to make this
>> pattern
>> > available to Scala users?
>> >
>> > Thank you all in advance for your feedback.
>> >
>> > --
>> > BR,
>> > Stefano Baghino
>> >
>> > Software Engineer @ Radicalbit
>> >
>>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Stefano Baghino <st...@radicalbit.io>.
Hi Stephan,

thank you for the quick reply and for your feedback; I agree with you that
breaking changes have to taken very seriously.

The rationale behind my proposal is that Scala users are already accustomed
to higher-order functions that manipulate collections and it would
beneficial for them to have an API that tries to adhere as much as possible
to the interface provided by the Scala Collections API. IMHO being able to
manipulate a DataSet or DataStream like a Scala collection idiomatically
would appeal to developers and reduce the friction for them to learn Flink.

If we want to pursue the renaming path, I think these changes (and porting
the rest of the codebase, like `flink-ml` and `flink-contrib`, to the new
method names) can be done in relatively little time. Since Flink is
approaching a major release, I think it's a good time to consider this
change, if the community deems it relevant.

While we await for feedback on the proposal, I can start working on both
paths to see how it would affect the codebase, what do you think?

On Thu, Jan 28, 2016 at 2:14 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Would be nice to support that, agreed.
>
> Such a fundamental break in the API worries me a bit, though - I would opt
> for a non-breaking addition.
> Wrapping the RichFunctions into Scala functions (which are actually wrapped
> as rich functions) with implicits seems like a workaround for something
> that should be very simple. Would probably also cost a bit of performance.
>
>
> I like the idea of "mapWith(...)" - if that were a simple non overloaded
> function accepting a Scala function, it should accept case-style functions,
> right?
> Simply adding that would probably solve things, but add a second variant of
> each function to the DataSet. An implicit conversion from DataSet to
> DataSetExtended (which implements the mapWith, reduceWith, ...) methods
> could help there...
>
> What do you think?
>
> Greetings,
> Stephan
>
>
> On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
> > Hello everybody,
> >
> > as I'm getting familiar with Flink I've found a possible improvement to
> the
> > Scala APIs: in Scala it's a common pattern to perform tuple extraction
> > using pattern matching, making functions working on tuples more readable,
> > like this:
> >
> > // referring to the mail count example in the training
> > // assuming `mails` is a DataSet[(String, String)]
> > // a pair of date and a string with username and email
> > val monthsAndEmails =
> >   mails.map {
> >     case (date, sender) =>
> >       (extractMonth(date), extractEmail(sender))
> >   }
> >
> > However, this is not possible when using the Scala APIs because of the
> > overloading of the `map` function in the `DataSet` and `DataStream`
> classes
> > (along with other higher-order function such as `flatMap` and `filter`).
> My
> > understanding is that the main reason to have two different overloaded
> > functions is to provide support for `RichFunction`s.
> > I've found out there has been some interest around the issue in the past
> (
> > [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
> > In the past couple of days me and my colleague Andrea have tried several
> > ways to address the problem, coming to two possible solutions:
> >
> >    1. don't overload and use different names, e.g. `map` taking a Scala
> >    function and `mapWith` taking a Flink MapFunction
> >    2. keep only the method taking a Scala function (which would be more
> >    idiomatic from a Scala perspective, IMHO) and providing an implicit
> >    conversion from the Flink function to the Scala function within the
> >    `org.apache.flink.api.scala` package object
> >
> > We've also evaluated several other approaches using union types and type
> > classes but we've found them to be too complex. Regarding the two
> > approaches I've cited, the first would imply a breaking change to the
> APIs,
> > while the second is giving me a hard time at figuring out some
> compilation
> > errors in `flink-libraries` and `flink-contrib` and as we tested it we
> > found out `RichMapFunction`s lose state (possibly because of the double
> > conversion, first to a Scala function, then to a simple `MapFunction`).
> >
> > You can have a look at the code I've written so far here (last 2
> commits):
> > https://github.com/radicalbit/flink/commits/1159
> >
> > We had a little exchange of ideas and thought that the first solution
> would
> > be easier and also interesting from the standpoint of the ergonomics of
> the
> > API (e.g. `line mapWith new LineSplitter`) and would like to gather some
> > feedback on the feasibility of this change.
> >
> > Would this still be regarded as a relevant improvement? What do you think
> > about it? Do you think there's time to work on them before the 1.0
> release?
> > What do you think about introducing breaking changes to make this pattern
> > available to Scala users?
> >
> > Thank you all in advance for your feedback.
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
> >
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Case style anonymous functions not supported by Scala API

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Would be nice to support that, agreed.

Such a fundamental break in the API worries me a bit, though - I would opt
for a non-breaking addition.
Wrapping the RichFunctions into Scala functions (which are actually wrapped
as rich functions) with implicits seems like a workaround for something
that should be very simple. Would probably also cost a bit of performance.


I like the idea of "mapWith(...)" - if that were a simple non overloaded
function accepting a Scala function, it should accept case-style functions,
right?
Simply adding that would probably solve things, but add a second variant of
each function to the DataSet. An implicit conversion from DataSet to
DataSetExtended (which implements the mapWith, reduceWith, ...) methods
could help there...

What do you think?

Greetings,
Stephan


On Thu, Jan 28, 2016 at 2:05 PM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> Hello everybody,
>
> as I'm getting familiar with Flink I've found a possible improvement to the
> Scala APIs: in Scala it's a common pattern to perform tuple extraction
> using pattern matching, making functions working on tuples more readable,
> like this:
>
> // referring to the mail count example in the training
> // assuming `mails` is a DataSet[(String, String)]
> // a pair of date and a string with username and email
> val monthsAndEmails =
>   mails.map {
>     case (date, sender) =>
>       (extractMonth(date), extractEmail(sender))
>   }
>
> However, this is not possible when using the Scala APIs because of the
> overloading of the `map` function in the `DataSet` and `DataStream` classes
> (along with other higher-order function such as `flatMap` and `filter`). My
> understanding is that the main reason to have two different overloaded
> functions is to provide support for `RichFunction`s.
> I've found out there has been some interest around the issue in the past (
> [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
> In the past couple of days me and my colleague Andrea have tried several
> ways to address the problem, coming to two possible solutions:
>
>    1. don't overload and use different names, e.g. `map` taking a Scala
>    function and `mapWith` taking a Flink MapFunction
>    2. keep only the method taking a Scala function (which would be more
>    idiomatic from a Scala perspective, IMHO) and providing an implicit
>    conversion from the Flink function to the Scala function within the
>    `org.apache.flink.api.scala` package object
>
> We've also evaluated several other approaches using union types and type
> classes but we've found them to be too complex. Regarding the two
> approaches I've cited, the first would imply a breaking change to the APIs,
> while the second is giving me a hard time at figuring out some compilation
> errors in `flink-libraries` and `flink-contrib` and as we tested it we
> found out `RichMapFunction`s lose state (possibly because of the double
> conversion, first to a Scala function, then to a simple `MapFunction`).
>
> You can have a look at the code I've written so far here (last 2 commits):
> https://github.com/radicalbit/flink/commits/1159
>
> We had a little exchange of ideas and thought that the first solution would
> be easier and also interesting from the standpoint of the ergonomics of the
> API (e.g. `line mapWith new LineSplitter`) and would like to gather some
> feedback on the feasibility of this change.
>
> Would this still be regarded as a relevant improvement? What do you think
> about it? Do you think there's time to work on them before the 1.0 release?
> What do you think about introducing breaking changes to make this pattern
> available to Scala users?
>
> Thank you all in advance for your feedback.
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>