You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joe Malt <jm...@yelp.com> on 2018/08/07 00:05:13 UTC

Using a custom DeserializationSchema with Kafka and Python

Hi,

I'm trying to write a pipeline using the new Python streaming API, which
reads from Kafka using FlinkKafkaConsumer010.

This works fine when using an existing deserializer like the SimpleStringSchema
but I need to define my own deserializer to process a custom format. I've
written a class which extends SimpleStringSchema, but I get an ImportError
when trying to use it.

The class is as follows:

from org.apache.flink.api.common.serialization import SimpleStringSchema

class MyCustomKafkaDeserializer(SimpleStringSchema):

    def __init__(self):
        SimpleStringSchema.__init__(self)
        print "created MyKafkaDeserializer"

    def deserialize(self, *args):
        *# snip*

I instantiate the Kafka consumer like this:

consumer = FlinkKafkaConsumer010([configs['kafkaTopic']],
MyCustomKafkaDeserializer(), props)


When I start the pipeline I see the message printed in the constructor (so
the deserializer is being created) but once env.execute() is called I get
this error:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: ImportError: No module named MyCustomKafkaDeserializer

	at org.python.core.Py.ImportError(Py.java:328)


The issue is the same whether MyCustomKafkaDeserializer is defined in
the same file as the pipeline, or in another file and imported. It
seems that the internals of Flink can't find the class for some
reason.


The command I'm using to run the pipeline:

./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py
/Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local

How can I make Flink see the custom deserializer?

Thanks,

Joe Malt

Software Engineering Intern, Stream Processing
Yelp

Re: Using a custom DeserializationSchema with Kafka and Python

Posted by Chesnay Schepler <ch...@apache.org>.
This doesn't work since the FlinkKafkaConsumer010 isn't aware that the 
given deserializer is a jython class.
Jython classes have to be serialized in a specific way (as seen in 
AbstractPythonUDF).

For this to work you'll need to create a (java!) wrapper around the 
FlinkKafkaConsumer010 that serializes the schema appropriately.
This applies to every connector that accepts user-defined classes.

Note that we haven't really looked at how the Streaming Python API 
interacts with existing connectors.

On 07.08.2018 02:05, Joe Malt wrote:
> Hi,
>
> I'm trying to write a pipeline using the new Python streaming API, 
> which reads from Kafka using FlinkKafkaConsumer010.
>
> This works fine when using an existing deserializer like the 
> SimpleStringSchema but I need to define my own deserializer to process 
> a custom format. I've written a class which extends 
> SimpleStringSchema, but I get an ImportError when trying to use it.
>
> The class is as follows:
> from org.apache.flink.api.common.serialization import 
> SimpleStringSchema class 
> MyCustomKafkaDeserializer(SimpleStringSchema): def __init__(self): 
> SimpleStringSchema.__init__(self) print "created MyKafkaDeserializer" 
> def deserialize(self, *args): *# snip*
> I instantiate the Kafka consumer like this:
>
> consumer  = FlinkKafkaConsumer010([configs['kafkaTopic']], MyCustomKafkaDeserializer(), props)
>
> When I start the pipeline I see the message printed in the constructor 
> (so the deserializer is being created) but once env.execute()is called 
> I get this error:
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
> 	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: ImportError: No module named MyCustomKafkaDeserializer
>
> 	at org.python.core.Py.ImportError(Py.java:328)
> The issue is the same whether MyCustomKafkaDeserializeris defined in 
> the same file as the pipeline, or in another file and imported. It 
> seems that the internals of Flink can't find the class for some reason.
> The command I'm using to run the pipeline:
> ./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py 
> /Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local
> How can I make Flink see the custom deserializer?
> Thanks,
> Joe Malt
> Software Engineering Intern, Stream Processing
> Yelp