You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Levan Huyen <lv...@gmail.com> on 2022/10/24 02:26:03 UTC

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

Hi all,

I'm trying to follow the code in 1.16 SNAPSHOT to have a Kinesis sink in
PyFlink 1.15, to write the output of a KeyedCoProcessFunction to Kinesis.

1. If I use ".set_serialization_schema(SimpleStringSchema())", then I got
the error message:
java.lang.ClassCastException: class [B cannot be cast to class
java.lang.String ([B and java.lang.String are in module java.base of loader
'bootstrap')
at
org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)

2. I then tried to write my own implementation of the `SerializationSchema`:

public class BytesSerDeSchema implements
DeserializationSchema<byte[]>, SerializationSchema<byte[]>

Then the code ran, but I got 16 bytes added to the beginning of every event
before they were sent to Kinesis (debugging showed that the `element`
variable in this method of the class BytesSerDeSchema:  public byte[]
serialize(byte[] element) already has those bytes padded). I guess those
padded bytes are a side product of Python pickle.

What I have to do now is to remove those 16 bytes in that `serialize`
method.

Could you please suggest a proper solution?

Thanks a lot.
Regards,
Huyen

On Mon, 27 Jun 2022 at 11:42, Dian Fu <di...@gmail.com> wrote:

> Hi John,
>
> Kinesis and most of the other connectors will be supported in 1.16, see
> [1] for more details about kinesis.
>
> For versions prior to 1.16, you could try just as Andrew suggested or
> refer to the implementations which are already available in the master as
> examples.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py
>
> On Fri, Jun 24, 2022 at 9:20 PM Andrew Otto <ot...@wikimedia.org> wrote:
>
>> I've had success using the Java in pyflink via pyflink.java_gateway.
>> Something like:
>>
>> from pyflink.java_gateway import get_gateway
>> jvm = get_gateway()
>>
>> # then perhaps something like:
>> FlinkKinesisConsumer = jvm.
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>>
>> There also seems to be a nice java_utils.py
>> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py>
>>  with helpers that may uh, help.
>>
>> Not sure if this will work, you might need to use the python env's a java
>> StreamTableEnvironment to do it?  Here's an example
>> <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937>
>> of how the python StreamTableEnvironment calls out to the Java one.
>>
>> BTW: I'm not an authority nor I have I really tried this, so take this
>> advice with a grain of salt!  :)
>>
>> Good luck!
>>
>>
>>
>>
>>
>>
>> On Fri, Jun 24, 2022 at 9:06 AM John Tipper <jo...@hotmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> There are a number of connectors which do not appear to be in the Python
>>> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
>>> connectors by using the Table API:
>>>
>>> CREATE TABLE my_table (...)
>>> WITH ('connector' = 'kinesis' ...)
>>>
>>>
>>> I guess if you wanted the stream as a DataStream you'd I guess you'd
>>> create the Table and then convert into a DataStream?
>>>
>>> Is there a way of directly instantiating these connectors in PyFlink
>>> without needed to use SQL like this (and without having to wait until
>>> v1.16)? e.g. the Java API looks like this:
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>>     "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>>>
>>>
>>> Many thanks,
>>>
>>> John
>>>
>>