You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sanne de Roever <sa...@gmail.com> on 2016/12/06 13:23:22 UTC

Flink Kafka producer with a topic per message

Hi,

Kafka producer clients for 0.10 allow the following syntax:

producer.send(new ProducerRecord<String, String>("my-topic",
Integer.toString(i), Integer.toString(i)));

The gist is that one producer can send messages to different topics; it is
useful for event routing ao. It makes the creation generic endpoints
easier. If I am right, Flink currently does not support this; would this be
a useful addition?

Cheers,

Sanne

Re: Flink Kafka producer with a topic per message

Posted by Sanne de Roever <sa...@gmail.com>.
Hi Gordon,

Yes, this has been addressed in 1.0.0; and in a very nice way. Thank you.

</CloseThread>

Cheers,

Sanne

On Wed, Dec 7, 2016 at 11:11 AM, Sanne de Roever <sa...@gmail.com>
wrote:

> Hi Gordon,
>
> Sounds very close, I will have look; thx.
>
> Cheers,
>
> Sanne
>
> On Wed, Dec 7, 2016 at 11:09 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> The FlinkKafkaProducers currently support per record topic through the
>> user-provided serialization schema which has a "getTargetTopic(T
>> element)" method called for per record,
>> and also decides the partition the record will be sent to through
>> a custom KafkaPartitioner, which is also provided
>> by the user when creating a FlinkKafkaProducer.
>>
>> Does this already provide the functionality you’ve mentioned? Or have I
>> misunderstood what you have in mind?
>>
>> Cheers,
>> Gordon
>>
>>
>> On December 7, 2016 at 5:55:24 PM, Sanne de Roever (
>> sanne.de.roever@gmail.com) wrote:
>>
>> The next step would be to determine the impact on the interface of a
>> Sink. Currently a Kafka sink has one topic, for example:
>>
>>
>

Re: Flink Kafka producer with a topic per message

Posted by Sanne de Roever <sa...@gmail.com>.
Hi Gordon,

Sounds very close, I will have look; thx.

Cheers,

Sanne

On Wed, Dec 7, 2016 at 11:09 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> The FlinkKafkaProducers currently support per record topic through the
> user-provided serialization schema which has a "getTargetTopic(T
> element)" method called for per record,
> and also decides the partition the record will be sent to through a custom
> KafkaPartitioner, which is also provided
> by the user when creating a FlinkKafkaProducer.
>
> Does this already provide the functionality you’ve mentioned? Or have I
> misunderstood what you have in mind?
>
> Cheers,
> Gordon
>
>
> On December 7, 2016 at 5:55:24 PM, Sanne de Roever (
> sanne.de.roever@gmail.com) wrote:
>
> The next step would be to determine the impact on the interface of a Sink.
> Currently a Kafka sink has one topic, for example:
>
>

Re: Flink Kafka producer with a topic per message

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

The FlinkKafkaProducers currently support per record topic through the
user-provided serialization schema which has a "getTargetTopic(T element)" method called for per record,
and also decides the partition the record will be sent to through a custom KafkaPartitioner, which is also provided 
by the user when creating a FlinkKafkaProducer.

Does this already provide the functionality you’ve mentioned? Or have I misunderstood what you have in mind?

Cheers,
Gordon


On December 7, 2016 at 5:55:24 PM, Sanne de Roever (sanne.de.roever@gmail.com) wrote:

The next step would be to determine the impact on the interface of a Sink. Currently a Kafka sink has one topic, for example:


Re: Flink Kafka producer with a topic per message

Posted by Sanne de Roever <sa...@gmail.com>.
Having a had a glass of water, the following option came up.

Having more advanced Sink integrations is likely to be a more general
concern. It would be better to have a more smooth path from the cleaner
abstraction to the advanced case. A more general proposal would be to alter
the Sink interface such that with each message optionally a key-value map
can be passed. This optional key-value map would allow the sink to alter
its behavior given the hints in the map.

On Wed, Dec 7, 2016 at 10:55 AM, Sanne de Roever <sa...@gmail.com>
wrote:

> A first sketch
>
> Central to this functionality is Kafka's ProducerRecord.
> ProducerRecord was introduced for Kafka 0.8. This means that any
> functionality could be introduced for all Flink-Kafka connectors; as per
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.0/apis/streaming/connectors/kafka.html
> ProducerRecord does two things:
>
>    - It allows a Kafka producer to send messages to different topics in
>    Kafka; this can be very helpful for message routing (I can make a more
>    formal example later)
>    - It also allows to create a key that determines the partition of the
>    message; introducing this would give Flink a more generic interface to
>    Kafka, which is a good thing.
>    - A partition can be identified by an integer or a key String that
>    will be hashed
>
> The next step would be to determine the impact on the interface of a Sink.
> Currently a Kafka sink has one topic, for example:
>
> .addSink(new FlinkKafkaProducer09[String](outputTopic, new
> SimpleStringSchema(), producerProps))
>
> In the new scenario one would like to pass not only the message to be
> sent, but also a topic string and a partition id or key (tuple-ish?). The
> next suggestion is just to start the thinking a bit; a shot in the dark. As
> somewhat blunt approach would be to map all messages to a valid
> ProducerRecord, and then to pass this ProducerRecord to the the Sink, and
> the rest is history. No attempt at abstractions are made, the reasoning
> being as follows.
>
> Evaluating I see the following. The current KafkaSink abstracts the Kafka
> functionality out on the Flink side. This is a good thing, and will work
> for most cases. Providing a tighter integration with Kafka will probably
> break down the abstraction. This seems to point into the direction of
> creating an advanced Kafka Sink. This sink gives more control, but less
> abstraction; it is for advanced applications. Any abstraction attempts will
> only create less transparency as far as I can see. The contract would not
> likely work on other queuing providers.
>
>
>
> On Wed, Dec 7, 2016 at 10:27 AM, Sanne de Roever <
> sanne.de.roever@gmail.com> wrote:
>
>> Good questions, I will follow up piece-wise to address the different
>> questions. Could a Wiki section be an idea, before I spread the information
>> across several posts?
>>
>> On Tue, Dec 6, 2016 at 4:50 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> You are right, it does not exist, and it would be a nice addition.
>>>
>>> Can you sketch some details on how to do that?
>>>
>>>   - Will it be a new type of producer? If yes, can as much as possible
>>> of the code be shared between the current and the new producer?
>>>   - Will it only be part of the Flink Kafka 0.10 producer?
>>>
>>> Thanks,
>>> Stephan
>>>
>>>
>>>
>>> On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <
>>> sanne.de.roever@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Kafka producer clients for 0.10 allow the following syntax:
>>>>
>>>> producer.send(new ProducerRecord<String, String>("my-topic",
>>>> Integer.toString(i), Integer.toString(i)));
>>>>
>>>> The gist is that one producer can send messages to different topics; it
>>>> is useful for event routing ao. It makes the creation generic endpoints
>>>> easier. If I am right, Flink currently does not support this; would this be
>>>> a useful addition?
>>>>
>>>> Cheers,
>>>>
>>>> Sanne
>>>>
>>>
>>>
>>
>

Re: Flink Kafka producer with a topic per message

Posted by Sanne de Roever <sa...@gmail.com>.
A first sketch

Central to this functionality is Kafka's ProducerRecord. ProducerRecord was
introduced for Kafka 0.8. This means that any functionality could be
introduced for all Flink-Kafka connectors; as per
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/connectors/kafka.html
ProducerRecord does two things:

   - It allows a Kafka producer to send messages to different topics in
   Kafka; this can be very helpful for message routing (I can make a more
   formal example later)
   - It also allows to create a key that determines the partition of the
   message; introducing this would give Flink a more generic interface to
   Kafka, which is a good thing.
   - A partition can be identified by an integer or a key String that will
   be hashed

The next step would be to determine the impact on the interface of a Sink.
Currently a Kafka sink has one topic, for example:

.addSink(new FlinkKafkaProducer09[String](outputTopic, new
SimpleStringSchema(), producerProps))

In the new scenario one would like to pass not only the message to be sent,
but also a topic string and a partition id or key (tuple-ish?). The next
suggestion is just to start the thinking a bit; a shot in the dark. As
somewhat blunt approach would be to map all messages to a valid
ProducerRecord, and then to pass this ProducerRecord to the the Sink, and
the rest is history. No attempt at abstractions are made, the reasoning
being as follows.

Evaluating I see the following. The current KafkaSink abstracts the Kafka
functionality out on the Flink side. This is a good thing, and will work
for most cases. Providing a tighter integration with Kafka will probably
break down the abstraction. This seems to point into the direction of
creating an advanced Kafka Sink. This sink gives more control, but less
abstraction; it is for advanced applications. Any abstraction attempts will
only create less transparency as far as I can see. The contract would not
likely work on other queuing providers.



On Wed, Dec 7, 2016 at 10:27 AM, Sanne de Roever <sa...@gmail.com>
wrote:

> Good questions, I will follow up piece-wise to address the different
> questions. Could a Wiki section be an idea, before I spread the information
> across several posts?
>
> On Tue, Dec 6, 2016 at 4:50 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> You are right, it does not exist, and it would be a nice addition.
>>
>> Can you sketch some details on how to do that?
>>
>>   - Will it be a new type of producer? If yes, can as much as possible of
>> the code be shared between the current and the new producer?
>>   - Will it only be part of the Flink Kafka 0.10 producer?
>>
>> Thanks,
>> Stephan
>>
>>
>>
>> On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <
>> sanne.de.roever@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Kafka producer clients for 0.10 allow the following syntax:
>>>
>>> producer.send(new ProducerRecord<String, String>("my-topic",
>>> Integer.toString(i), Integer.toString(i)));
>>>
>>> The gist is that one producer can send messages to different topics; it
>>> is useful for event routing ao. It makes the creation generic endpoints
>>> easier. If I am right, Flink currently does not support this; would this be
>>> a useful addition?
>>>
>>> Cheers,
>>>
>>> Sanne
>>>
>>
>>
>

Re: Flink Kafka producer with a topic per message

Posted by Sanne de Roever <sa...@gmail.com>.
Good questions, I will follow up piece-wise to address the different
questions. Could a Wiki section be an idea, before I spread the information
across several posts?

On Tue, Dec 6, 2016 at 4:50 PM, Stephan Ewen <se...@apache.org> wrote:

> You are right, it does not exist, and it would be a nice addition.
>
> Can you sketch some details on how to do that?
>
>   - Will it be a new type of producer? If yes, can as much as possible of
> the code be shared between the current and the new producer?
>   - Will it only be part of the Flink Kafka 0.10 producer?
>
> Thanks,
> Stephan
>
>
>
> On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <sanne.de.roever@gmail.com
> > wrote:
>
>> Hi,
>>
>> Kafka producer clients for 0.10 allow the following syntax:
>>
>> producer.send(new ProducerRecord<String, String>("my-topic",
>> Integer.toString(i), Integer.toString(i)));
>>
>> The gist is that one producer can send messages to different topics; it
>> is useful for event routing ao. It makes the creation generic endpoints
>> easier. If I am right, Flink currently does not support this; would this be
>> a useful addition?
>>
>> Cheers,
>>
>> Sanne
>>
>
>

Re: Flink Kafka producer with a topic per message

Posted by Stephan Ewen <se...@apache.org>.
You are right, it does not exist, and it would be a nice addition.

Can you sketch some details on how to do that?

  - Will it be a new type of producer? If yes, can as much as possible of
the code be shared between the current and the new producer?
  - Will it only be part of the Flink Kafka 0.10 producer?

Thanks,
Stephan



On Tue, Dec 6, 2016 at 2:23 PM, Sanne de Roever <sa...@gmail.com>
wrote:

> Hi,
>
> Kafka producer clients for 0.10 allow the following syntax:
>
> producer.send(new ProducerRecord<String, String>("my-topic",
> Integer.toString(i), Integer.toString(i)));
>
> The gist is that one producer can send messages to different topics; it is
> useful for event routing ao. It makes the creation generic endpoints
> easier. If I am right, Flink currently does not support this; would this be
> a useful addition?
>
> Cheers,
>
> Sanne
>