You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Akshay Nagpal (JIRA)" <ji...@apache.org> on 2018/08/03 05:50:00 UTC

[jira] [Created] (FLINK-10039) FlinkKafkaProducer - Serializer Error

Akshay Nagpal created FLINK-10039:
-------------------------------------

             Summary: FlinkKafkaProducer - Serializer Error
                 Key: FLINK-10039
                 URL: https://issues.apache.org/jira/browse/FLINK-10039
             Project: Flink
          Issue Type: Bug
          Components: Streaming Connectors
    Affects Versions: 1.4.2
            Reporter: Akshay Nagpal


I am working on a use case where I input the data using Kafka's console producer, read the same data in my program using FlinkKafkaConsumer and write it back to another Kafka topic using FlinkKafkaProducer. 

I am using 1.4.2 version of the following dependencies:

flink-java

flink-streaming-java_2.11

flink-connector-kafka-0.10_2.11

 

The codes are as follows:

KafkaConsoleProducer:
{code:java}
./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property "parse.key=true" --property "key.separator=:" --key-serializer org.apache.kafka.common.serialization.StringSerializer --value-serializer org.apache.kafka.common.serialization.StringSerializer
{code}
KafkaFlinkConsumer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("test1", 
new SimpleStringSchema(),
properties);

DataStream<String> stream = env.addSource(myConsumer);
{code}
KafkaFlinkProducer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties1.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


FlinkKafkaProducer010<String> myProducer = new FlinkKafkaProducer010<String>("my-topic", 
new SimpleStringSchema(), 
properties);

stream.addSink(myProducer);
{code}
When I specify key and value serializer as StringSerializer in FlinkKafkaProducer, it gives me the following error in the logs:

 
{code:java}
org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
{code}
Though it's giving me this error, it's still producing the data in the topic.

When I am using ByteArraySerializer though with the producer, it is not giving me the error in the logs. It is also giving me the output.

Moreover, DataStream's print method is not printing the data on the console.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)