You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Antoine Philippot <an...@teads.tv> on 2017/12/28 15:37:51 UTC

RichAsyncFunction in scala

Hi,

It lacks a version of RichAsyncFunction class in the scala API or the
possibility to handle a class which extends AbstractRichFunction and
implements AsyncFunction (from the scala API).

I made a small dev on our current flink fork because we need to use the
open method to add our custom metrics from getRuntimeContext.getMetricGroup
method.
https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba

Do you already plan to release this feature soon ? Do you want me to create
a new Jira ticket, propose a pull request ?

Antoine

Re: RichAsyncFunction in scala

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

There is this Jira issue: https://issues.apache.org/jira/browse/FLINK-6756 <https://issues.apache.org/jira/browse/FLINK-6756>

Best,
Aljoscha

> On 28. Dec 2017, at 17:10, Antoine Philippot <an...@teads.tv> wrote:
> 
> Hi Ufuk,
> 
> I don't think it is possible as I use this function as a parameter of AsyncDataStream (from the scala API) which is mandatory to use with the scala DataStream.
> 
> 
> 
> Le jeu. 28 déc. 2017 à 16:55, Ufuk Celebi <uce@apache.org <ma...@apache.org>> a écrit :
> Hey Antoine,
> 
> isn't it possible to use the Java RichAsyncFunction from Scala like this:
> 
> class Test extends RichAsyncFunction[Int, Int] {
> 
>   override def open(parameters: Configuration): Unit = super.open(parameters)
> 
>   override def asyncInvoke(input: Int, resultFuture:
> functions.async.ResultFuture[Int]): Unit = ???
> }
> 
> – Ufuk
> 
> 
> 
> On Thu, Dec 28, 2017 at 4:37 PM, Antoine Philippot
> <antoine.philippot@teads.tv <ma...@teads.tv>> wrote:
> > Hi,
> >
> > It lacks a version of RichAsyncFunction class in the scala API or the
> > possibility to handle a class which extends AbstractRichFunction and
> > implements AsyncFunction (from the scala API).
> >
> > I made a small dev on our current flink fork because we need to use the open
> > method to add our custom metrics from getRuntimeContext.getMetricGroup
> > method.
> > https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba <https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba>
> >
> > Do you already plan to release this feature soon ? Do you want me to create
> > a new Jira ticket, propose a pull request ?
> >
> > Antoine


Re: RichAsyncFunction in scala

Posted by Antoine Philippot <an...@teads.tv>.
Hi Ufuk,

I don't think it is possible as I use this function as a parameter
of AsyncDataStream (from the scala API) which is mandatory to use with the
scala DataStream.



Le jeu. 28 déc. 2017 à 16:55, Ufuk Celebi <uc...@apache.org> a écrit :

> Hey Antoine,
>
> isn't it possible to use the Java RichAsyncFunction from Scala like this:
>
> class Test extends RichAsyncFunction[Int, Int] {
>
>   override def open(parameters: Configuration): Unit =
> super.open(parameters)
>
>   override def asyncInvoke(input: Int, resultFuture:
> functions.async.ResultFuture[Int]): Unit = ???
> }
>
> – Ufuk
>
>
>
> On Thu, Dec 28, 2017 at 4:37 PM, Antoine Philippot
> <an...@teads.tv> wrote:
> > Hi,
> >
> > It lacks a version of RichAsyncFunction class in the scala API or the
> > possibility to handle a class which extends AbstractRichFunction and
> > implements AsyncFunction (from the scala API).
> >
> > I made a small dev on our current flink fork because we need to use the
> open
> > method to add our custom metrics from getRuntimeContext.getMetricGroup
> > method.
> >
> https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba
> >
> > Do you already plan to release this feature soon ? Do you want me to
> create
> > a new Jira ticket, propose a pull request ?
> >
> > Antoine
>

Re: RichAsyncFunction in scala

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Antoine,

isn't it possible to use the Java RichAsyncFunction from Scala like this:

class Test extends RichAsyncFunction[Int, Int] {

  override def open(parameters: Configuration): Unit = super.open(parameters)

  override def asyncInvoke(input: Int, resultFuture:
functions.async.ResultFuture[Int]): Unit = ???
}

– Ufuk



On Thu, Dec 28, 2017 at 4:37 PM, Antoine Philippot
<an...@teads.tv> wrote:
> Hi,
>
> It lacks a version of RichAsyncFunction class in the scala API or the
> possibility to handle a class which extends AbstractRichFunction and
> implements AsyncFunction (from the scala API).
>
> I made a small dev on our current flink fork because we need to use the open
> method to add our custom metrics from getRuntimeContext.getMetricGroup
> method.
> https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba
>
> Do you already plan to release this feature soon ? Do you want me to create
> a new Jira ticket, propose a pull request ?
>
> Antoine