You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by rss rss <rs...@gmail.com> on 2016/08/28 16:16:39 UTC

different Kafka serialization for keyed and non keyed messages

Hello,

  why Flink implements different serialization schemes for keyed and non
keyed messages for Kafka?

  I'm using two ways of loading of messages to Kafka. First way is on-fly
loading without Flink by Kafka's means only. In this case I'm using
something like:

props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName());
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);
String key = event.getUserId();
String  value = DummyEvent.eventToString(event);
producer.send(new ProducerRecord<>(topic, key, value));


 And from Flink side I can read it without a key by code like:

DataStream<String> dataStream = env
        .addSource(new FlinkKafkaConsumer08<String>(
                "topic",
                new *SimpleStringSchema(),* kafkaProps));

As a result I have pure message without a key. Actually I need a key only
for partitioning by Kafka and I have an appropriate class
https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java
. That is standard java-hash for String class.


  Also I have other case for messages loading from hadoop to Kafka. I'm
using Flink for this purpose. All is ok when I'm using

dataStream.addSink(new
FlinkKafkaProducer08<>(config.getProperty("topic",
Config.INPUT_TOPIC_NAME),
        new SimpleStringSchema(),
        kafkaProps));

But I need partitioning in Kafka and I changed it to

TypeInformation<Tuple2<String, String>> stringStringInfo =
        TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2<String,
String>");

KeyedSerializationSchema<Tuple2<String, String>> schema =
        new TypeInformationKeyValueSerializationSchema<>(String.class,
String.class, env.getConfig());

dataStream
        .map(json -> {
            Event event = gson.fromJson(json, Event.class);
            return new Tuple2<String, String>(event.getUserId(), json);
        }).returns(stringStringInfo)
        .setParallelism(partitions)
        .addSink(new
FlinkKafkaProducer08<>(config.getProperty("topic",
Config.INPUT_TOPIC_NAME),
                schema,
                kafkaProps));

As a result I see that a message which are serialized by
TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
SimpleStringSchema() or by Kafka's StringSerializer only with additional
first symbol. I guess this is a size of String which is added by
org.apache.flink.types.StringValue#writeString. That is the value of a
message is not more readable by Spark, Storm, Kafka consumer with standard
deserialization....

   The question, is it correct behavior of Flink? And should I implement
own serializer and partitioner for Flink's Kafka sink if I want to use just
simple String serialization which may be read by all other tools without
Flink?

   And second question, why Flink requires to implement a custom
partitioner for serialized byte[] stream instead of using of primary
objects as in Kafka's partitioner? Or instead of just allowing to use
Kafka's partitioner class.

  PS: I can give a link to sources if you have an access to
https://github.com/stratosphere/ private repos.

Thanks,
best regards

Re: different Kafka serialization for keyed and non keyed messages

Posted by Robert Metzger <rm...@apache.org>.
Hi Rss,

> why Flink implements different serialization schemes for keyed and non
keyed messages for Kafka?

The non-keyed serialization schema is a basic schema, which works for most
use cases.
For advanced users which need access to the key, offsets, the partition or
topic, there's the keyed ser schema.
But the keyed schema is richer and can completely subsume the simple,
non-keyed one.

> As a result I see that a message which are serialized by
TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
SimpleStringSchema() or by Kafka's StringSerializer only with additional
first symbol.

The TypeInformationKeyValueSerializationSchema is only meant to be used for
Flink <--> Flink communication through Kafka, because it depends on Flink's
internal serializers (it might even depend on the exact ExecutionConfig
settings).


> The question, is it correct behavior of Flink? And should I implement own
serializer and partitioner for Flink's Kafka sink if I want to use just
simple String serialization which may be read by all other tools without
Flink?

The behavior is correct. If the SimpleStringSchema is not sufficient for
the other systems, you need to impl. your own serializer.

> And second question, why Flink requires to implement a custom partitioner
for serialized byte[] stream instead of using of primary objects as in
Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner
class.

If you are not specifying any Flink partitioner, we'll use the configured
Kafka partitioner.
The advantage of using Flink's own partitioner is that you can access
information like the subtaskId and the number of subtasks.

Regards,
Robert




On Sun, Aug 28, 2016 at 6:16 PM, rss rss <rs...@gmail.com> wrote:

> Hello,
>
>   why Flink implements different serialization schemes for keyed and non
> keyed messages for Kafka?
>
>   I'm using two ways of loading of messages to Kafka. First way is on-fly
> loading without Flink by Kafka's means only. In this case I'm using
> something like:
>
> props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName());
> props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
> props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>
> producer = new KafkaProducer<>(props);
> String key = event.getUserId();
> String  value = DummyEvent.eventToString(event);
> producer.send(new ProducerRecord<>(topic, key, value));
>
>
>  And from Flink side I can read it without a key by code like:
>
> DataStream<String> dataStream = env
>         .addSource(new FlinkKafkaConsumer08<String>(
>                 "topic",
>                 new *SimpleStringSchema(),* kafkaProps));
>
> As a result I have pure message without a key. Actually I need a key only
> for partitioning by Kafka and I have an appropriate class
> https://github.com/rssdev10/flink-kafka-streaming/blob/
> master/src/main/java/KafkaPartitioner.java . That is standard java-hash
> for String class.
>
>
>   Also I have other case for messages loading from hadoop to Kafka. I'm
> using Flink for this purpose. All is ok when I'm using
>
> dataStream.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME),
>         new SimpleStringSchema(),
>         kafkaProps));
>
> But I need partitioning in Kafka and I changed it to
>
> TypeInformation<Tuple2<String, String>> stringStringInfo =
>         TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2<String, String>");
>
> KeyedSerializationSchema<Tuple2<String, String>> schema =
>         new TypeInformationKeyValueSerializationSchema<>(String.class, String.class, env.getConfig());
>
> dataStream
>         .map(json -> {
>             Event event = gson.fromJson(json, Event.class);
>             return new Tuple2<String, String>(event.getUserId(), json);
>         }).returns(stringStringInfo)
>         .setParallelism(partitions)
>         .addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME),
>                 schema,
>                 kafkaProps));
>
> As a result I see that a message which are serialized by
> TypeInformationKeyValueSerializationSchema may be deserialized by Flink's
> SimpleStringSchema() or by Kafka's StringSerializer only with additional
> first symbol. I guess this is a size of String which is added by
> org.apache.flink.types.StringValue#writeString. That is the value of a
> message is not more readable by Spark, Storm, Kafka consumer with standard
> deserialization....
>
>    The question, is it correct behavior of Flink? And should I implement
> own serializer and partitioner for Flink's Kafka sink if I want to use just
> simple String serialization which may be read by all other tools without
> Flink?
>
>    And second question, why Flink requires to implement a custom
> partitioner for serialized byte[] stream instead of using of primary
> objects as in Kafka's partitioner? Or instead of just allowing to use
> Kafka's partitioner class.
>
>   PS: I can give a link to sources if you have an access to
> https://github.com/stratosphere/ private repos.
>
> Thanks,
> best regards
>