You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Alexandru Gutan <al...@gmail.com> on 2017/08/27 13:50:41 UTC

Serialization Issue

My stream is producing records of type Tuple2<String,String>

*.toString()* output *(usr12345,{"_key":"usr12345","_temperature":46.6})*

where the key is *usr12345* and value is
*{"_key":"usr12345","_temperature":46.6}*

The *.print()* on the stream outputs the value correctly:
*(usr12345,{"_key":"usr12345","_temperature":46.6})*

But when I write the stream to Kafka the key becomes " *usr12345" *and
the value "*(**{"_key":"usr12345","_temperature":46.6}"*

Notice the space at the beginning of the key and the left parenthesis
at the beginning of the value.

Very strange. Why this might happen?


Here is the serialization code:

TypeInformation<String> resultType = TypeInformation.of(String.class);

KeyedSerializationSchema<Tuple2<String, String>> schema =
      new TypeInformationKeyValueSerializationSchema<>(resultType,
resultType, env.getConfig());

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
flinkKafkaProducerConfig =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
      stream,
      "topic",
      schema,
      kafkaProducerProperties);

Re: Serialization Issue

Posted by Alexx <al...@gmail.com>.
Thanks!

I tested the KeyedSerializationSchema and it indeed works. 
I tried to use TypeInformationKeyValueSerializationSchema just to make
things a bit more automated :)



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Serialization-Issue-tp19400p19407.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Re: Serialization Issue

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

TypeInformationKeyValueSerializationSchema uses Flink TypeSerializers to serialize those Strings. What you're seeing is the binary representation of those Strings which, admittedly, resembles the original Strings. If you want to have the data in Kafka with real Strings, I think you need to have a custom KeyedSerializationSchema for your type.

Best,
Aljoscha

> On 27. Aug 2017, at 15:59, Alexx <al...@gmail.com> wrote:
> 
> My stream is producing records of type *Tuple2<String,String>*
> *.toString()* output *(usr12345,{"_key":"usr12345","_temperature":46.6})*
> where the key is *usr12345* and value is
> *{"_key":"usr12345","_temperature":46.6}*
> The *.print()* on the stream outputs the value correctly:
> *(usr12345,{"_key":"usr12345","_temperature":46.6})*
> But when I write the stream to Kafka the key becomes *" usr12345"* and the
> value *"({"_key":"usr12345","_temperature":46.6}"*
> Notice the space at the beginning of the key and the left parenthesis at the
> beginning of the value.
> Very strange. Why this might happen?
> 
> Here is the serialization code:
> 
> *TypeInformation<String> resultType = TypeInformation.of(String.class);
> 
> KeyedSerializationSchema<Tuple2&lt;String, String>> schema =
>      new TypeInformationKeyValueSerializationSchema<>(resultType,
> resultType, env.getConfig());
> 
> FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
> flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
>      stream,   
>      "topic",    
>      schema,  
>      kafkaProducerProperties);*
> 
> 
> 
> --
> View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Serialization-Issue-tp19400p19401.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.


Re: Serialization Issue

Posted by Alexx <al...@gmail.com>.
My stream is producing records of type *Tuple2<String,String>*
*.toString()* output *(usr12345,{"_key":"usr12345","_temperature":46.6})*
where the key is *usr12345* and value is
*{"_key":"usr12345","_temperature":46.6}*
The *.print()* on the stream outputs the value correctly:
*(usr12345,{"_key":"usr12345","_temperature":46.6})*
But when I write the stream to Kafka the key becomes *" usr12345"* and the
value *"({"_key":"usr12345","_temperature":46.6}"*
Notice the space at the beginning of the key and the left parenthesis at the
beginning of the value.
Very strange. Why this might happen?

Here is the serialization code:

*TypeInformation<String> resultType = TypeInformation.of(String.class);

KeyedSerializationSchema<Tuple2&lt;String, String>> schema =
      new TypeInformationKeyValueSerializationSchema<>(resultType,
resultType, env.getConfig());

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
      stream,   
      "topic",    
      schema,  
      kafkaProducerProperties);*



--
View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Serialization-Issue-tp19400p19401.html
Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.