You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Tipper <jo...@hotmail.com> on 2022/06/24 13:05:20 UTC

How to use connectors in PyFlink 1.15.0 when not defined in Python API?

Hi all,

There are a number of connectors which do not appear to be in the Python API v1.15.0, e.g. Kinesis. I can see that it's possible to use these connectors by using the Table API:

CREATE TABLE my_table (...)
WITH ('connector' = 'kinesis' ...)

I guess if you wanted the stream as a DataStream you'd I guess you'd create the Table and then convert into a DataStream?

Is there a way of directly instantiating these connectors in PyFlink without needed to use SQL like this (and without having to wait until v1.16)? e.g. the Java API looks like this:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));

Many thanks,

John

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

Posted by Levan Huyen <lv...@gmail.com>.
Hi all,

I'm trying to follow the code in 1.16 SNAPSHOT to have a Kinesis sink in
PyFlink 1.15, to write the output of a KeyedCoProcessFunction to Kinesis.

1. If I use ".set_serialization_schema(SimpleStringSchema())", then I got
the error message:
java.lang.ClassCastException: class [B cannot be cast to class
java.lang.String ([B and java.lang.String are in module java.base of loader
'bootstrap')
at
org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)

2. I then tried to write my own implementation of the `SerializationSchema`:

public class BytesSerDeSchema implements
DeserializationSchema<byte[]>, SerializationSchema<byte[]>

Then the code ran, but I got 16 bytes added to the beginning of every event
before they were sent to Kinesis (debugging showed that the `element`
variable in this method of the class BytesSerDeSchema:  public byte[]
serialize(byte[] element) already has those bytes padded). I guess those
padded bytes are a side product of Python pickle.

What I have to do now is to remove those 16 bytes in that `serialize`
method.

Could you please suggest a proper solution?

Thanks a lot.
Regards,
Huyen

On Mon, 27 Jun 2022 at 11:42, Dian Fu <di...@gmail.com> wrote:

> Hi John,
>
> Kinesis and most of the other connectors will be supported in 1.16, see
> [1] for more details about kinesis.
>
> For versions prior to 1.16, you could try just as Andrew suggested or
> refer to the implementations which are already available in the master as
> examples.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py
>
> On Fri, Jun 24, 2022 at 9:20 PM Andrew Otto <ot...@wikimedia.org> wrote:
>
>> I've had success using the Java in pyflink via pyflink.java_gateway.
>> Something like:
>>
>> from pyflink.java_gateway import get_gateway
>> jvm = get_gateway()
>>
>> # then perhaps something like:
>> FlinkKinesisConsumer = jvm.
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>>
>> There also seems to be a nice java_utils.py
>> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py>
>>  with helpers that may uh, help.
>>
>> Not sure if this will work, you might need to use the python env's a java
>> StreamTableEnvironment to do it?  Here's an example
>> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937>
>> of how the python StreamTableEnvironment calls out to the Java one.
>>
>> BTW: I'm not an authority nor I have I really tried this, so take this
>> advice with a grain of salt!  :)
>>
>> Good luck!
>>
>>
>>
>>
>>
>>
>> On Fri, Jun 24, 2022 at 9:06 AM John Tipper <jo...@hotmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> There are a number of connectors which do not appear to be in the Python
>>> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
>>> connectors by using the Table API:
>>>
>>> CREATE TABLE my_table (...)
>>> WITH ('connector' = 'kinesis' ...)
>>>
>>>
>>> I guess if you wanted the stream as a DataStream you'd I guess you'd
>>> create the Table and then convert into a DataStream?
>>>
>>> Is there a way of directly instantiating these connectors in PyFlink
>>> without needed to use SQL like this (and without having to wait until
>>> v1.16)? e.g. the Java API looks like this:
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>>     "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>>>
>>>
>>> Many thanks,
>>>
>>> John
>>>
>>

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

Posted by Dian Fu <di...@gmail.com>.
Hi John,

Kinesis and most of the other connectors will be supported in 1.16, see [1]
for more details about kinesis.

For versions prior to 1.16, you could try just as Andrew suggested or refer
to the implementations which are already available in the master as
examples.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py

On Fri, Jun 24, 2022 at 9:20 PM Andrew Otto <ot...@wikimedia.org> wrote:

> I've had success using the Java in pyflink via pyflink.java_gateway.
> Something like:
>
> from pyflink.java_gateway import get_gateway
> jvm = get_gateway()
>
> # then perhaps something like:
> FlinkKinesisConsumer = jvm.
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>
> There also seems to be a nice java_utils.py
> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py>
>  with helpers that may uh, help.
>
> Not sure if this will work, you might need to use the python env's a java
> StreamTableEnvironment to do it?  Here's an example
> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937>
> of how the python StreamTableEnvironment calls out to the Java one.
>
> BTW: I'm not an authority nor I have I really tried this, so take this
> advice with a grain of salt!  :)
>
> Good luck!
>
>
>
>
>
>
> On Fri, Jun 24, 2022 at 9:06 AM John Tipper <jo...@hotmail.com>
> wrote:
>
>> Hi all,
>>
>> There are a number of connectors which do not appear to be in the Python
>> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
>> connectors by using the Table API:
>>
>> CREATE TABLE my_table (...)
>> WITH ('connector' = 'kinesis' ...)
>>
>>
>> I guess if you wanted the stream as a DataStream you'd I guess you'd
>> create the Table and then convert into a DataStream?
>>
>> Is there a way of directly instantiating these connectors in PyFlink
>> without needed to use SQL like this (and without having to wait until
>> v1.16)? e.g. the Java API looks like this:
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>     "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>>
>>
>> Many thanks,
>>
>> John
>>
>

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

Posted by Andrew Otto <ot...@wikimedia.org>.
I've had success using the Java in pyflink via pyflink.java_gateway.
Something like:

from pyflink.java_gateway import get_gateway
jvm = get_gateway()

# then perhaps something like:
FlinkKinesisConsumer = jvm.
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer

There also seems to be a nice java_utils.py
<https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py>
 with helpers that may uh, help.

Not sure if this will work, you might need to use the python env's a java
StreamTableEnvironment to do it?  Here's an example
<https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937>
of how the python StreamTableEnvironment calls out to the Java one.

BTW: I'm not an authority nor I have I really tried this, so take this
advice with a grain of salt!  :)

Good luck!






On Fri, Jun 24, 2022 at 9:06 AM John Tipper <jo...@hotmail.com> wrote:

> Hi all,
>
> There are a number of connectors which do not appear to be in the Python
> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
> connectors by using the Table API:
>
> CREATE TABLE my_table (...)
> WITH ('connector' = 'kinesis' ...)
>
>
> I guess if you wanted the stream as a DataStream you'd I guess you'd
> create the Table and then convert into a DataStream?
>
> Is there a way of directly instantiating these connectors in PyFlink
> without needed to use SQL like this (and without having to wait until
> v1.16)? e.g. the Java API looks like this:
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>     "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>
>
> Many thanks,
>
> John
>