You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shannon Carey <sc...@expedia.com> on 2019/05/14 21:29:35 UTC

RichAsyncFunction for Scala?

I have some awkward code in a few Flink jobs which is converting a Scala stream into a Java stream in order to pass it to AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to old versions of Flink not having the ability to do async stuff with a Scala stream.

In newer versions of Flink, I see that org.apache.flink.streaming.api.scala.AsyncDataStream is available. However, it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction, and there does not appear to be an AbstractRichFunction subclass of that trait as I expected. Is there a way to use the Scala interfaces but provide a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will leave the old code as-is.

Thanks,
Shannon

Re: RichAsyncFunction for Scala?

Posted by Rong Rong <wa...@gmail.com>.
Hi Shannon,

I think the RichAsyncFunction[1] extends from the normal AsyncFunction so
regarding on the API perspective you should be able to use it.

The problem I think is with Scala anonymous function where I think it went
through a different code path when wrapping the Scala RichAsyncFunction
[2].
Is your problem specifically with the rich anonymous async function or do
you also have problem with regular function extended from RichAsyncFunction?

--
Rong

[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RichAsyncFunction.scala
[2]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala#L289

On Thu, May 16, 2019 at 12:26 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Shannon,
>
> That's a good observation. To be honest, I know why the Scala
> AsyncFunction does not implement RichFunction.
> Maybe this was not intentional and just overlooked when porting the
> functionality to Scala.
>
> Would you mind creating a Jira ticket for this?
>
> Thank you,
> Fabian
>
> Am Di., 14. Mai 2019 um 23:29 Uhr schrieb Shannon Carey <
> scarey@expedia.com>:
>
>> I have some awkward code in a few Flink jobs which is converting a Scala
>> stream into a Java stream in order to pass it to
>> AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to
>> old versions of Flink not having the ability to do async stuff with a Scala
>> stream.
>>
>>
>>
>> In newer versions of Flink, I see that
>> org.apache.flink.streaming.api.scala.AsyncDataStream is available. However,
>> it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction,
>> and there does not appear to be an AbstractRichFunction subclass of that
>> trait as I expected. Is there a way to use the Scala interfaces but provide
>> a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will
>> leave the old code as-is.
>>
>>
>>
>> Thanks,
>>
>> Shannon
>>
>

Re: RichAsyncFunction for Scala?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Shannon,

That's a good observation. To be honest, I know why the Scala AsyncFunction
does not implement RichFunction.
Maybe this was not intentional and just overlooked when porting the
functionality to Scala.

Would you mind creating a Jira ticket for this?

Thank you,
Fabian

Am Di., 14. Mai 2019 um 23:29 Uhr schrieb Shannon Carey <scarey@expedia.com
>:

> I have some awkward code in a few Flink jobs which is converting a Scala
> stream into a Java stream in order to pass it to
> AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to
> old versions of Flink not having the ability to do async stuff with a Scala
> stream.
>
>
>
> In newer versions of Flink, I see that
> org.apache.flink.streaming.api.scala.AsyncDataStream is available. However,
> it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction,
> and there does not appear to be an AbstractRichFunction subclass of that
> trait as I expected. Is there a way to use the Scala interfaces but provide
> a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will
> leave the old code as-is.
>
>
>
> Thanks,
>
> Shannon
>