You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Lam <ke...@shopify.com> on 2021/12/14 21:41:00 UTC

Python Interop with Java defined org.apache.flink.api.common.functions.Function classes?

Hi all,

We currently operate several Flink applications using the Scala API, and
run on kubernetes in Application mode. We're interested in researching the
Python API and how we can support Python for application developers that
prefer to use Python.

We have a common library which implements a number of useful sources and
sinks, as well as some implementations
of org.apache.flink.api.common.functions.Function, eg. a MapFunction for
computing and reporting latency metrics. We'd like to continue to use the
common library, and make it available to Python developers.

We understand that Java sources and sinks can be used in the Python API. Is
there a way to call Java org.apache.flink.api.common.functions.Function
implementations (eg. MapFunction, ProcessFunction classes) from the Python
API [1]? If not, are there any plans to support this?

Thanks in advance!


[1] imagining something like this:

```
env.set_parallelism(1)
ds = env.add_source(MySource())
# process the data stream with a Java function
ds = (ds
    .map(JavaMapFunction("com.example.MyJavaMapFunction")))
)
```

Re: Python Interop with Java defined org.apache.flink.api.common.functions.Function classes?

Posted by Dian Fu <di...@gmail.com>.
Hi Kevin,

You could try to use it as following:

```
from pyflink.java_gateway import get_gateway

jvm = get_gateway().jvm
ds = (
    DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
)
```

Regards,
Dian

On Wed, Dec 15, 2021 at 5:41 AM Kevin Lam <ke...@shopify.com> wrote:

> Hi all,
>
> We currently operate several Flink applications using the Scala API, and
> run on kubernetes in Application mode. We're interested in researching the
> Python API and how we can support Python for application developers that
> prefer to use Python.
>
> We have a common library which implements a number of useful sources and
> sinks, as well as some implementations
> of org.apache.flink.api.common.functions.Function, eg. a MapFunction for
> computing and reporting latency metrics. We'd like to continue to use the
> common library, and make it available to Python developers.
>
> We understand that Java sources and sinks can be used in the Python API.
> Is there a way to call Java org.apache.flink.api.common.functions.Function
> implementations (eg. MapFunction, ProcessFunction classes) from the Python
> API [1]? If not, are there any plans to support this?
>
> Thanks in advance!
>
>
> [1] imagining something like this:
>
> ```
> env.set_parallelism(1)
> ds = env.add_source(MySource())
> # process the data stream with a Java function
> ds = (ds
>     .map(JavaMapFunction("com.example.MyJavaMapFunction")))
> )
> ```
>