You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joe Malt <jm...@yelp.com> on 2018/08/14 23:16:07 UTC
ClassCastException when writing to a Kafka stream with Flink + Python
Hi,
I'm trying to write to a Kafka stream in a Flink job using the new Python
streaming API.
My program looks like this:
def main(factory):
props = Properties()
props.setProperty("bootstrap.servers",configs['kafkaBroker'])
consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']],
SimpleStringSchema(), props)
producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'],
SimpleStringSchema(), props)
env = factory.get_execution_environment()
stream = env.add_java_source(consumer)
stream.output() # this works (prints to a .out file)
stream.add_sink(producer) # producing to this causes the exception
env.execute()
I'm getting a ClassCastException when trying to output to the
FlinkKafkaProducer:
java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast
to java.lang.String
at org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
at org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
It seems that the Python string isn't getting converted to a
java.lang.String, which should happen automatically in Jython.
I've tried adding a MapFunction that maps each input to String(input)where
String is the constructor for java.lang.String. This made no difference; I
get the same error.
Any ideas?
Thanks,
Joe Malt
Software Engineering Intern
Yelp
Re: ClassCastException when writing to a Kafka stream with Flink +
Python
Posted by Chesnay Schepler <ch...@apache.org>.
As seen in the stacktrace every sink added via StreamExEnv#add_source is
wrapped in a PythonSinkFunction which internally converts things to
PyObjects, that's why the mapper had no effect.
Currently we don't differentiate between java/python sinks, contrary to
sources where we have an explicit StreamExEnv#add_java_source method.
There are 2 ways to approach this issue:
* As alluded in a previous mail, create a python wrapper around the
kafka consumer class.
* extend PythonDataStream class with a separate method for kafka.
Unfortunately I don't think we can solve this in a generic matter (i.e.
add_java_source) since the java types wouldn't fit at compile time.
On 15.08.2018 04:15, vino yang wrote:
> Hi Joe,
>
> ping Chesnay for you, please wait for the reply.
>
> Thanks, vino.
>
> Joe Malt <jmalt@yelp.com <ma...@yelp.com>> 于2018年8月15日周三
> 上午7:16写道:
>
> Hi,
>
> I'm trying to write to a Kafka stream in a Flink job using the new
> Python streaming API.
>
> My program looks like this:
>
> def main(factory):
>
> props = Properties()
> props.setProperty("bootstrap.servers",configs['kafkaBroker'])
>
> consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']], SimpleStringSchema(), props)
> producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'], SimpleStringSchema(), props)
>
> env = factory.get_execution_environment()
>
> stream = env.add_java_source(consumer)
>
> stream.output()# this works (prints to a .out file) stream.add_sink(producer)# producing to this causes the exception env.execute()
>
> I'm getting a ClassCastException when trying to output to the
> FlinkKafkaProducer:
>
> java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to java.lang.String
> at org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
> at org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
> at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
> at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
> at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>
> It seems that the Python string isn't getting converted to a
> java.lang.String, which should happen automatically in Jython.
>
> I've tried adding a MapFunction that maps each input to
> String(input)where String is the constructor for java.lang.String.
> This made no difference; I get the same error.
>
> Any ideas?
>
> Thanks,
>
> Joe Malt
>
> Software Engineering Intern
> Yelp
>
Re: ClassCastException when writing to a Kafka stream with Flink + Python
Posted by vino yang <ya...@gmail.com>.
Hi Joe,
ping Chesnay for you, please wait for the reply.
Thanks, vino.
Joe Malt <jm...@yelp.com> 于2018年8月15日周三 上午7:16写道:
> Hi,
>
> I'm trying to write to a Kafka stream in a Flink job using the new Python
> streaming API.
>
> My program looks like this:
>
> def main(factory):
>
> props = Properties()
> props.setProperty("bootstrap.servers",configs['kafkaBroker'])
>
> consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']], SimpleStringSchema(), props)
> producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'], SimpleStringSchema(), props)
>
> env = factory.get_execution_environment()
>
> stream = env.add_java_source(consumer)
>
> stream.output() # this works (prints to a .out file)
> stream.add_sink(producer) # producing to this causes the exception
>
> env.execute()
>
> I'm getting a ClassCastException when trying to output to the
> FlinkKafkaProducer:
>
> java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to java.lang.String
> at org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
> at org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
> at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
> at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
> at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>
> It seems that the Python string isn't getting converted to a
> java.lang.String, which should happen automatically in Jython.
>
> I've tried adding a MapFunction that maps each input to String(input)where
> String is the constructor for java.lang.String. This made no difference;
> I get the same error.
>
> Any ideas?
>
> Thanks,
>
> Joe Malt
>
> Software Engineering Intern
> Yelp
>