You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by mi...@gmail.com, mi...@gmail.com on 2018/10/19 21:05:44 UTC

write to a kafka topic that is set in data

Hi guys!!

I'm trying to find a way to write to a Kafka topic using KafkaIO.write() But I need to be able to get topic name dynamically based on the data received. For example, I would like to send data for one tenant to topic "data_feed_1" and for another tenant to "topic data_feed_999".
I'm coming from Flink where it's possible via KeyedSerializationSchema.getTargetTopic().
Is there anything similar in KafkaIO?

Thanks,
Dmitry

Re: write to a kafka topic that is set in data

Posted by Alexey Romanenko <ar...@gmail.com>.
Yes, I see. I think it should work here too. Agree to discuss details in Jira/PR.
Thank you very much!

> On 26 Oct 2018, at 19:13, Raghu Angadi <ra...@google.com> wrote:
> 
> On Fri, Oct 26, 2018 at 7:50 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> Perhaps, the optimal solution would be to have a common way to transfer a meta data along with key/values to KafkaIO transform. Don’t you think the we could use KafkaRecord (used for reading currently) for this purpose?  
>> 
>> I agree, I think this is the Right Thing to do. It addresses a few other issues too (Support for Kafka headers while writing requires this :  BEAM-4038 <https://issues.apache.org/jira/browse/BEAM-4038>).
> 
> Thank you for pointing to this Jira. I see that some work had been done there but it has not been finished yet. The idea was to use PCollection<ProducerRecord<K, V>> instead of current PCollection<KV<K, V>>, right? 
> 
> It sounds good for me since perfectly fits the goal of this thread (dynamic topics) but it seems that we need to change a contract of KafkaIO.Write to use ProducerRecord<> instead of KV<>. So, it will be the breaking changes for user API. Do you have an idea how we can make it back-compatible? 
> 
> Interface would be pretty straight forward. See 'Write<K, V>.values()' which takes PCollection<V> instead of PCollection<KV<K, V>>. I think similar technique would wor. All the existing code works without any changes and will be a one line change for users who want to write ProducerRecords.
> 
> Of course, internal implementation will have more changes since we will be carrying ProducerRecords rather than KV<K, V>. I think that is fine and safe. We can discuss more details in a follow up Jira or PR. Thanks for proposing this solution. 
> 
> Raghu.
> 
>  
> 
> 
>>  
>> > Actually I take this back. It I don't think it coupled with output topic and partitions. It might just work (assuming Kafka can handle individual transactions spanning many topics well). 
>> 
>> Do you mean that we can just take a topic name based on KV (using Serialisable function or other way discussed above) and use it instead of current spec.getTopic() ?
>> 
>> Yes, something on those lines. In practice it might hit practical limitations on Kafka transaction support. E.g. if a bundle has 1000 records going to 200 distinct topics, all of those will be written in a single transaction. Not sure how well that would work in practice, but theoretically it should. 
>>  
>> Raghu.
>> 
>> 
>>> On 24 Oct 2018, at 20:01, Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
>>> 
>>> On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
>>> My bad Alexey, I will review today. I had skimmed through the patch on my phone. You are right, exactly-once sink support is not required for now.
>>> 
>>>  
>>> It is a quite a different beast and necessarily coupled with transactions on a specific topic-partitions for correctness.
>>> Actually I take this back. It I don't think it coupled with output topic and partitions. It might just work (assuming Kafka can handle individual transactions spanning many topics well). As you mentioned, we would still need to plumb it through. As such we don't know if exactly-once sink is being used much... (I would love to hear about it if anyone is using it).
>>>  
>>> 
>>> The primary concern is with the API. The user provides a function to map an output record to its topic. We have found that such an API is usually problematic. E.g. what if the record does not encode enough information about topic? Say we want to select topic name based on aggregation window. 
>>> It might be bit more code, but simpler to let the user decide topic for each record _before_ writing to the sink. E.g. it could be KafkaIO.Writer<KV<topic, KV<key, value>>.
>>> I wanted to think a little bit more about this, but didn't get around to it. I will comment on the PR today.
>>> 
>>> thanks for the initiative and the PR.
>>> Raghu.
>>> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>> I added a simple support of this for usual type of Kafka sink (PR: https://github.com/apache/beam/pull/6776 <https://github.com/apache/beam/pull/6776> , welcomed for review, btw :) )
>>> 
>>> In the same time, there is another, more complicated, type of sink - EOS (Exactly Once Sink). In this case the data is partitioned among fixed number of shards and it creates one ShardWriter per shard. In its order, ShardWriter depends on Kafka topic. So, seems that in case of multiple and dynamic sink topics, we need to create new ShardWriter for every new topic per shard, 
>>> 
>>> Is my assumption correct or I missed/misunderstood something? 
>>> 
>>>> On 20 Oct 2018, at 01:21, Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
>>>> 
>>>> Thanks Raghu, added starter and newbie labels to the issue.
>>>> 
>>>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
>>>> It will be a good starter feature for someone interested in Beam & Kafka. Writer is very simple in Beam. It is little more than a ParDo. 
>>>> 
>>>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <minaevd@gmail.com <ma...@gmail.com>> wrote:
>>>> Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks for the suggestion, unfortunately, I don't have a fixed number of topics. Still, we'll probably use your approach for a limited number of topics until the functionality is added, thank you!
>>>> 
>>>> Thanks,
>>>> Dmitry
>>>> 
>>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
>>>> If there are a fixed number of topics, you could partition your write by structuring your pipeline as such:
>>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>>                         \---> KafkaIO.write(topicB)
>>>>                         \---> KafkaIO.write(...)
>>>> 
>>>> There is no support currently for writing to Kafka dynamically based upon a destination that is part of the data.
>>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 <https://issues.apache.org/jira/browse/BEAM-5798> for the issue.
>>>> 
>>>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <ma...@gmail.com> <minaevd@gmail.com <ma...@gmail.com>> wrote:
>>>> Hi guys!!
>>>> 
>>>> I'm trying to find a way to write to a Kafka topic using KafkaIO.write() But I need to be able to get topic name dynamically based on the data received. For example, I would like to send data for one tenant to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>> I'm coming from Flink where it's possible via KeyedSerializationSchema.getTargetTopic().
>>>> Is there anything similar in KafkaIO?
>>>> 
>>>> Thanks,
>>>> Dmitry
>>>> -- 
>>>> --
>>>> Dmitry
>>>> 


Re: write to a kafka topic that is set in data

Posted by Raghu Angadi <ra...@google.com>.
On Fri, Oct 26, 2018 at 7:50 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Perhaps, the optimal solution would be to have a common way to transfer a
>> meta data along with key/values to KafkaIO transform. Don’t you think the
>> we could use KafkaRecord (used for reading currently) for this purpose?
>>
>
> I agree, I think this is the Right Thing to do. It addresses a few other
> issues too (Support for Kafka headers while writing requires this :
> BEAM-4038 <https://issues.apache.org/jira/browse/BEAM-4038>).
>
>
> Thank you for pointing to this Jira. I see that some work had been done
> there but it has not been finished yet. The idea was to use
> PCollection<ProducerRecord<K, V>> instead of current PCollection<KV<K, V>>,
> right?
>
> It sounds good for me since perfectly fits the goal of this thread
> (dynamic topics) but it seems that we need to change a contract of
> KafkaIO.Write to use ProducerRecord<> instead of KV<>. So, it will be the
> breaking changes for user API. Do you have an idea how we can make it
> back-compatible?
>

Interface would be pretty straight forward. See 'Write<K, V>.values()'
which takes PCollection<V> instead of PCollection<KV<K, V>>. I think
similar technique would wor. All the existing code works without any
changes and will be a one line change for users who want to write
ProducerRecords.

Of course, internal implementation will have more changes since we will be
carrying ProducerRecords rather than KV<K, V>. I think that is fine and
safe. We can discuss more details in a follow up Jira or PR. Thanks for
proposing this solution.

Raghu.



>
>
>
>
>> > Actually I take this back. It I don't think it coupled with output
>> topic and partitions. It might just work (assuming Kafka can handle
>> individual transactions spanning many topics well).
>>
>> Do you mean that we can just take a topic name based on KV (using
>> Serialisable function or other way discussed above) and use it instead of
>> current spec.getTopic() ?
>>
>
> Yes, something on those lines. In practice it might hit practical
> limitations on Kafka transaction support. E.g. if a bundle has 1000 records
> going to 200 distinct topics, all of those will be written in a single
> transaction. Not sure how well that would work in practice, but
> theoretically it should.
>
> Raghu.
>
>>
>>
>> On 24 Oct 2018, at 20:01, Raghu Angadi <ra...@google.com> wrote:
>>
>> On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi <ra...@google.com> wrote:
>>
>>> My bad Alexey, I will review today. I had skimmed through the patch on
>>> my phone. You are right, exactly-once sink support is not required for now.
>>>
>>
>>
>>
>>> It is a quite a different beast and necessarily coupled with
>>> transactions on a specific topic-partitions for correctness.
>>>
>> Actually I take this back. It I don't think it coupled with output topic
>> and partitions. It might just work (assuming Kafka can handle individual
>> transactions spanning many topics well). As you mentioned, we would still
>> need to plumb it through. As such we don't know if exactly-once sink is
>> being used much... (I would love to hear about it if anyone is using it).
>>
>>
>>>
>>> The primary concern is with the API. The user provides a function to map
>>> an output record to its topic. We have found that such an API is usually
>>> problematic. E.g. what if the record does not encode enough information
>>> about topic? Say we want to select topic name based on aggregation window.
>>> It might be bit more code, but simpler to let the user decide topic for
>>> each record _before_ writing to the sink. E.g. it could be
>>> KafkaIO.Writer<KV<topic, KV<key, value>>.
>>> I wanted to think a little bit more about this, but didn't get around to
>>> it. I will comment on the PR today.
>>>
>>> thanks for the initiative and the PR.
>>> Raghu.
>>> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> I added a simple support of this for usual type of Kafka sink (PR:
>>>> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :)
>>>> )
>>>>
>>>> In the same time, there is another, more complicated, type of sink -
>>>> EOS (Exactly Once Sink). In this case the data is partitioned among fixed
>>>> number of shards and it creates one ShardWriter per shard. In its
>>>> order, ShardWriter depends on Kafka topic. So, seems that in case of
>>>> multiple and dynamic sink topics, we need to create new ShardWriter for
>>>> every new topic per shard,
>>>>
>>>> Is my assumption correct or I missed/misunderstood something?
>>>>
>>>> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>> Thanks Raghu, added starter and newbie labels to the issue.
>>>>
>>>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> It will be a good starter feature for someone interested in Beam &
>>>>> Kafka. Writer is very simple in Beam. It is little more than a ParDo.
>>>>>
>>>>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>>>>>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>>>>>> topics. Still, we'll probably use your approach for a limited number of
>>>>>> topics until the functionality is added, thank you!
>>>>>>
>>>>>> Thanks,
>>>>>> Dmitry
>>>>>>
>>>>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> If there are a fixed number of topics, you could partition your
>>>>>>> write by structuring your pipeline as such:
>>>>>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>>>>>                         \---> KafkaIO.write(topicB)
>>>>>>>                         \---> KafkaIO.write(...)
>>>>>>>
>>>>>>> There is no support currently for writing to Kafka dynamically based
>>>>>>> upon a destination that is part of the data.
>>>>>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the
>>>>>>> issue.
>>>>>>>
>>>>>>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi guys!!
>>>>>>>>
>>>>>>>> I'm trying to find a way to write to a Kafka topic using
>>>>>>>> KafkaIO.write() But I need to be able to get topic name dynamically based
>>>>>>>> on the data received. For example, I would like to send data for one tenant
>>>>>>>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>>>>>> I'm coming from Flink where it's possible via
>>>>>>>> KeyedSerializationSchema.getTargetTopic().
>>>>>>>> Is there anything similar in KafkaIO?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dmitry
>>>>>>>>
>>>>>>> --
>>>>>>
>>>>>> --
>>>>>> Dmitry
>>>>>>
>>>>>
>>>>
>>
>

Re: write to a kafka topic that is set in data

Posted by Alexey Romanenko <ar...@gmail.com>.
> On 25 Oct 2018, at 19:19, Raghu Angadi <ra...@google.com> wrote:
> 
> On Thu, Oct 25, 2018 at 8:31 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> > The primary concern is with the E.g. what if the record does not encode enough information about topic? Say we want to select topic name based on aggregation window. 
> > E.g. it could be KafkaIO.Writer<KV<topic, KV<key, value>>.
> 
> Well, I think even with this API, using user provided function, this information (topic name) can be incapsulated into key or value but then it should be filtered out (if needed) by user Serializer… Not a nice solution.
> Interesting. It works :).

Yes but it’s not very convenient for user, imo, so, I propose to not follow this way.

> Perhaps, the optimal solution would be to have a common way to transfer a meta data along with key/values to KafkaIO transform. Don’t you think the we could use KafkaRecord (used for reading currently) for this purpose?  
> 
> I agree, I think this is the Right Thing to do. It addresses a few other issues too (Support for Kafka headers while writing requires this :  BEAM-4038 <https://issues.apache.org/jira/browse/BEAM-4038>).

Thank you for pointing to this Jira. I see that some work had been done there but it has not been finished yet. The idea was to use PCollection<ProducerRecord<K, V>> instead of current PCollection<KV<K, V>>, right? 

It sounds good for me since perfectly fits the goal of this thread (dynamic topics) but it seems that we need to change a contract of KafkaIO.Write to use ProducerRecord<> instead of KV<>. So, it will be the breaking changes for user API. Do you have an idea how we can make it back-compatible? 


>  
> > Actually I take this back. It I don't think it coupled with output topic and partitions. It might just work (assuming Kafka can handle individual transactions spanning many topics well). 
> 
> Do you mean that we can just take a topic name based on KV (using Serialisable function or other way discussed above) and use it instead of current spec.getTopic() ?
> 
> Yes, something on those lines. In practice it might hit practical limitations on Kafka transaction support. E.g. if a bundle has 1000 records going to 200 distinct topics, all of those will be written in a single transaction. Not sure how well that would work in practice, but theoretically it should. 
>  
> Raghu.
> 
> 
>> On 24 Oct 2018, at 20:01, Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
>> 
>> On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
>> My bad Alexey, I will review today. I had skimmed through the patch on my phone. You are right, exactly-once sink support is not required for now.
>> 
>>  
>> It is a quite a different beast and necessarily coupled with transactions on a specific topic-partitions for correctness.
>> Actually I take this back. It I don't think it coupled with output topic and partitions. It might just work (assuming Kafka can handle individual transactions spanning many topics well). As you mentioned, we would still need to plumb it through. As such we don't know if exactly-once sink is being used much... (I would love to hear about it if anyone is using it).
>>  
>> 
>> The primary concern is with the API. The user provides a function to map an output record to its topic. We have found that such an API is usually problematic. E.g. what if the record does not encode enough information about topic? Say we want to select topic name based on aggregation window. 
>> It might be bit more code, but simpler to let the user decide topic for each record _before_ writing to the sink. E.g. it could be KafkaIO.Writer<KV<topic, KV<key, value>>.
>> I wanted to think a little bit more about this, but didn't get around to it. I will comment on the PR today.
>> 
>> thanks for the initiative and the PR.
>> Raghu.
>> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> I added a simple support of this for usual type of Kafka sink (PR: https://github.com/apache/beam/pull/6776 <https://github.com/apache/beam/pull/6776> , welcomed for review, btw :) )
>> 
>> In the same time, there is another, more complicated, type of sink - EOS (Exactly Once Sink). In this case the data is partitioned among fixed number of shards and it creates one ShardWriter per shard. In its order, ShardWriter depends on Kafka topic. So, seems that in case of multiple and dynamic sink topics, we need to create new ShardWriter for every new topic per shard, 
>> 
>> Is my assumption correct or I missed/misunderstood something? 
>> 
>>> On 20 Oct 2018, at 01:21, Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
>>> 
>>> Thanks Raghu, added starter and newbie labels to the issue.
>>> 
>>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
>>> It will be a good starter feature for someone interested in Beam & Kafka. Writer is very simple in Beam. It is little more than a ParDo. 
>>> 
>>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <minaevd@gmail.com <ma...@gmail.com>> wrote:
>>> Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks for the suggestion, unfortunately, I don't have a fixed number of topics. Still, we'll probably use your approach for a limited number of topics until the functionality is added, thank you!
>>> 
>>> Thanks,
>>> Dmitry
>>> 
>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
>>> If there are a fixed number of topics, you could partition your write by structuring your pipeline as such:
>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>                         \---> KafkaIO.write(topicB)
>>>                         \---> KafkaIO.write(...)
>>> 
>>> There is no support currently for writing to Kafka dynamically based upon a destination that is part of the data.
>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 <https://issues.apache.org/jira/browse/BEAM-5798> for the issue.
>>> 
>>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <ma...@gmail.com> <minaevd@gmail.com <ma...@gmail.com>> wrote:
>>> Hi guys!!
>>> 
>>> I'm trying to find a way to write to a Kafka topic using KafkaIO.write() But I need to be able to get topic name dynamically based on the data received. For example, I would like to send data for one tenant to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>> I'm coming from Flink where it's possible via KeyedSerializationSchema.getTargetTopic().
>>> Is there anything similar in KafkaIO?
>>> 
>>> Thanks,
>>> Dmitry
>>> -- 
>>> --
>>> Dmitry
>>> 
>> 
> 


Re: write to a kafka topic that is set in data

Posted by Raghu Angadi <ra...@google.com>.
On Thu, Oct 25, 2018 at 8:31 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> > The primary concern is with the E.g. what if the record does not
> encode enough information about topic? Say we want to select topic name
> based on aggregation window.
> > E.g. it could be KafkaIO.Writer<KV<topic, KV<key, value>>.
>
> Well, I think even with this API, using user provided function, this
> information (topic name) can be incapsulated into key or value but then it
> should be filtered out (if needed) by user Serializer… Not a nice solution.
>
Interesting. It works :).


> Perhaps, the optimal solution would be to have a common way to transfer a
> meta data along with key/values to KafkaIO transform. Don’t you think the
> we could use KafkaRecord (used for reading currently) for this purpose?
>

I agree, I think this is the Right Thing to do. It addresses a few other
issues too (Support for Kafka headers while writing requires this :
BEAM-4038 <https://issues.apache.org/jira/browse/BEAM-4038>).


> > Actually I take this back. It I don't think it coupled with output topic
> and partitions. It might just work (assuming Kafka can handle individual
> transactions spanning many topics well).
>
> Do you mean that we can just take a topic name based on KV (using
> Serialisable function or other way discussed above) and use it instead of
> current spec.getTopic() ?
>

Yes, something on those lines. In practice it might hit practical
limitations on Kafka transaction support. E.g. if a bundle has 1000 records
going to 200 distinct topics, all of those will be written in a single
transaction. Not sure how well that would work in practice, but
theoretically it should.

Raghu.

>
>
> On 24 Oct 2018, at 20:01, Raghu Angadi <ra...@google.com> wrote:
>
> On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi <ra...@google.com> wrote:
>
>> My bad Alexey, I will review today. I had skimmed through the patch on my
>> phone. You are right, exactly-once sink support is not required for now.
>>
>
>
>
>> It is a quite a different beast and necessarily coupled with transactions
>> on a specific topic-partitions for correctness.
>>
> Actually I take this back. It I don't think it coupled with output topic
> and partitions. It might just work (assuming Kafka can handle individual
> transactions spanning many topics well). As you mentioned, we would still
> need to plumb it through. As such we don't know if exactly-once sink is
> being used much... (I would love to hear about it if anyone is using it).
>
>
>>
>> The primary concern is with the API. The user provides a function to map
>> an output record to its topic. We have found that such an API is usually
>> problematic. E.g. what if the record does not encode enough information
>> about topic? Say we want to select topic name based on aggregation window.
>> It might be bit more code, but simpler to let the user decide topic for
>> each record _before_ writing to the sink. E.g. it could be
>> KafkaIO.Writer<KV<topic, KV<key, value>>.
>> I wanted to think a little bit more about this, but didn't get around to
>> it. I will comment on the PR today.
>>
>> thanks for the initiative and the PR.
>> Raghu.
>> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> I added a simple support of this for usual type of Kafka sink (PR:
>>> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) )
>>>
>>> In the same time, there is another, more complicated, type of sink - EOS
>>> (Exactly Once Sink). In this case the data is partitioned among fixed
>>> number of shards and it creates one ShardWriter per shard. In its
>>> order, ShardWriter depends on Kafka topic. So, seems that in case of
>>> multiple and dynamic sink topics, we need to create new ShardWriter for
>>> every new topic per shard,
>>>
>>> Is my assumption correct or I missed/misunderstood something?
>>>
>>> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>> Thanks Raghu, added starter and newbie labels to the issue.
>>>
>>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <ra...@google.com> wrote:
>>>
>>>> It will be a good starter feature for someone interested in Beam &
>>>> Kafka. Writer is very simple in Beam. It is little more than a ParDo.
>>>>
>>>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>>>>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>>>>> topics. Still, we'll probably use your approach for a limited number of
>>>>> topics until the functionality is added, thank you!
>>>>>
>>>>> Thanks,
>>>>> Dmitry
>>>>>
>>>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> If there are a fixed number of topics, you could partition your write
>>>>>> by structuring your pipeline as such:
>>>>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>>>>                         \---> KafkaIO.write(topicB)
>>>>>>                         \---> KafkaIO.write(...)
>>>>>>
>>>>>> There is no support currently for writing to Kafka dynamically based
>>>>>> upon a destination that is part of the data.
>>>>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the
>>>>>> issue.
>>>>>>
>>>>>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi guys!!
>>>>>>>
>>>>>>> I'm trying to find a way to write to a Kafka topic using
>>>>>>> KafkaIO.write() But I need to be able to get topic name dynamically based
>>>>>>> on the data received. For example, I would like to send data for one tenant
>>>>>>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>>>>> I'm coming from Flink where it's possible via
>>>>>>> KeyedSerializationSchema.getTargetTopic().
>>>>>>> Is there anything similar in KafkaIO?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dmitry
>>>>>>>
>>>>>> --
>>>>>
>>>>> --
>>>>> Dmitry
>>>>>
>>>>
>>>
>

Re: write to a kafka topic that is set in data

Posted by Alexey Romanenko <ar...@gmail.com>.
> The primary concern is with the E.g. what if the record does not encode enough information about topic? Say we want to select topic name based on aggregation window. 
> E.g. it could be KafkaIO.Writer<KV<topic, KV<key, value>>.

Well, I think even with this API, using user provided function, this information (topic name) can be incapsulated into key or value but then it should be filtered out (if needed) by user Serializer… Not a nice solution.

Perhaps, the optimal solution would be to have a common way to transfer a meta data along with key/values to KafkaIO transform. Don’t you think the we could use KafkaRecord (used for reading currently) for this purpose?  

> Actually I take this back. It I don't think it coupled with output topic and partitions. It might just work (assuming Kafka can handle individual transactions spanning many topics well). 

Do you mean that we can just take a topic name based on KV (using Serialisable function or other way discussed above) and use it instead of current spec.getTopic() ?


> On 24 Oct 2018, at 20:01, Raghu Angadi <ra...@google.com> wrote:
> 
> On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
> My bad Alexey, I will review today. I had skimmed through the patch on my phone. You are right, exactly-once sink support is not required for now.
> 
>  
> It is a quite a different beast and necessarily coupled with transactions on a specific topic-partitions for correctness.
> Actually I take this back. It I don't think it coupled with output topic and partitions. It might just work (assuming Kafka can handle individual transactions spanning many topics well). As you mentioned, we would still need to plumb it through. As such we don't know if exactly-once sink is being used much... (I would love to hear about it if anyone is using it).
>  
> 
> The primary concern is with the API. The user provides a function to map an output record to its topic. We have found that such an API is usually problematic. E.g. what if the record does not encode enough information about topic? Say we want to select topic name based on aggregation window. 
> It might be bit more code, but simpler to let the user decide topic for each record _before_ writing to the sink. E.g. it could be KafkaIO.Writer<KV<topic, KV<key, value>>.
> I wanted to think a little bit more about this, but didn't get around to it. I will comment on the PR today.
> 
> thanks for the initiative and the PR.
> Raghu.
> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> I added a simple support of this for usual type of Kafka sink (PR: https://github.com/apache/beam/pull/6776 <https://github.com/apache/beam/pull/6776> , welcomed for review, btw :) )
> 
> In the same time, there is another, more complicated, type of sink - EOS (Exactly Once Sink). In this case the data is partitioned among fixed number of shards and it creates one ShardWriter per shard. In its order, ShardWriter depends on Kafka topic. So, seems that in case of multiple and dynamic sink topics, we need to create new ShardWriter for every new topic per shard, 
> 
> Is my assumption correct or I missed/misunderstood something? 
> 
>> On 20 Oct 2018, at 01:21, Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> 
>> Thanks Raghu, added starter and newbie labels to the issue.
>> 
>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
>> It will be a good starter feature for someone interested in Beam & Kafka. Writer is very simple in Beam. It is little more than a ParDo. 
>> 
>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <minaevd@gmail.com <ma...@gmail.com>> wrote:
>> Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks for the suggestion, unfortunately, I don't have a fixed number of topics. Still, we'll probably use your approach for a limited number of topics until the functionality is added, thank you!
>> 
>> Thanks,
>> Dmitry
>> 
>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> If there are a fixed number of topics, you could partition your write by structuring your pipeline as such:
>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>                         \---> KafkaIO.write(topicB)
>>                         \---> KafkaIO.write(...)
>> 
>> There is no support currently for writing to Kafka dynamically based upon a destination that is part of the data.
>> I filed https://issues.apache.org/jira/browse/BEAM-5798 <https://issues.apache.org/jira/browse/BEAM-5798> for the issue.
>> 
>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <ma...@gmail.com> <minaevd@gmail.com <ma...@gmail.com>> wrote:
>> Hi guys!!
>> 
>> I'm trying to find a way to write to a Kafka topic using KafkaIO.write() But I need to be able to get topic name dynamically based on the data received. For example, I would like to send data for one tenant to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>> I'm coming from Flink where it's possible via KeyedSerializationSchema.getTargetTopic().
>> Is there anything similar in KafkaIO?
>> 
>> Thanks,
>> Dmitry
>> -- 
>> --
>> Dmitry
>> 
> 


Re: write to a kafka topic that is set in data

Posted by Raghu Angadi <ra...@google.com>.
On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi <ra...@google.com> wrote:

> My bad Alexey, I will review today. I had skimmed through the patch on my
> phone. You are right, exactly-once sink support is not required for now.
>



> It is a quite a different beast and necessarily coupled with transactions
> on a specific topic-partitions for correctness.
>
Actually I take this back. It I don't think it coupled with output topic
and partitions. It might just work (assuming Kafka can handle individual
transactions spanning many topics well). As you mentioned, we would still
need to plumb it through. As such we don't know if exactly-once sink is
being used much... (I would love to hear about it if anyone is using it).


>
> The primary concern is with the API. The user provides a function to map
> an output record to its topic. We have found that such an API is usually
> problematic. E.g. what if the record does not encode enough information
> about topic? Say we want to select topic name based on aggregation window.
> It might be bit more code, but simpler to let the user decide topic for
> each record _before_ writing to the sink. E.g. it could be
> KafkaIO.Writer<KV<topic, KV<key, value>>.
> I wanted to think a little bit more about this, but didn't get around to
> it. I will comment on the PR today.
>
> thanks for the initiative and the PR.
> Raghu.
> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> I added a simple support of this for usual type of Kafka sink (PR:
>> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) )
>>
>> In the same time, there is another, more complicated, type of sink - EOS
>> (Exactly Once Sink). In this case the data is partitioned among fixed
>> number of shards and it creates one ShardWriter per shard. In its
>> order, ShardWriter depends on Kafka topic. So, seems that in case of
>> multiple and dynamic sink topics, we need to create new ShardWriter for
>> every new topic per shard,
>>
>> Is my assumption correct or I missed/misunderstood something?
>>
>> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
>>
>> Thanks Raghu, added starter and newbie labels to the issue.
>>
>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <ra...@google.com> wrote:
>>
>>> It will be a good starter feature for someone interested in Beam &
>>> Kafka. Writer is very simple in Beam. It is little more than a ParDo.
>>>
>>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mi...@gmail.com> wrote:
>>>
>>>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>>>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>>>> topics. Still, we'll probably use your approach for a limited number of
>>>> topics until the functionality is added, thank you!
>>>>
>>>> Thanks,
>>>> Dmitry
>>>>
>>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> If there are a fixed number of topics, you could partition your write
>>>>> by structuring your pipeline as such:
>>>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>>>                         \---> KafkaIO.write(topicB)
>>>>>                         \---> KafkaIO.write(...)
>>>>>
>>>>> There is no support currently for writing to Kafka dynamically based
>>>>> upon a destination that is part of the data.
>>>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>>>>>
>>>>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi guys!!
>>>>>>
>>>>>> I'm trying to find a way to write to a Kafka topic using
>>>>>> KafkaIO.write() But I need to be able to get topic name dynamically based
>>>>>> on the data received. For example, I would like to send data for one tenant
>>>>>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>>>> I'm coming from Flink where it's possible via
>>>>>> KeyedSerializationSchema.getTargetTopic().
>>>>>> Is there anything similar in KafkaIO?
>>>>>>
>>>>>> Thanks,
>>>>>> Dmitry
>>>>>>
>>>>> --
>>>>
>>>> --
>>>> Dmitry
>>>>
>>>
>>

Re: write to a kafka topic that is set in data

Posted by Raghu Angadi <ra...@google.com>.
My bad Alexey, I will review today. I had skimmed through the patch on my
phone. You are right, exactly-once sink support is not required for now. It
is a quite a different beast and necessarily coupled with transactions on a
specific topic-partitions for correctness.

The primary concern is with the API. The user provides a function to map an
output record to its topic. We have found that such an API is usually
problematic. E.g. what if the record does not encode enough information
about topic? Say we want to select topic name based on aggregation window.
It might be bit more code, but simpler to let the user decide topic for
each record _before_ writing to the sink. E.g. it could be
KafkaIO.Writer<KV<topic, KV<key, value>>.
I wanted to think a little bit more about this, but didn't get around to
it. I will comment on the PR today.

thanks for the initiative and the PR.
Raghu.
On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> I added a simple support of this for usual type of Kafka sink (PR:
> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) )
>
> In the same time, there is another, more complicated, type of sink - EOS
> (Exactly Once Sink). In this case the data is partitioned among fixed
> number of shards and it creates one ShardWriter per shard. In its
> order, ShardWriter depends on Kafka topic. So, seems that in case of
> multiple and dynamic sink topics, we need to create new ShardWriter for
> every new topic per shard,
>
> Is my assumption correct or I missed/misunderstood something?
>
> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
>
> Thanks Raghu, added starter and newbie labels to the issue.
>
> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <ra...@google.com> wrote:
>
>> It will be a good starter feature for someone interested in Beam & Kafka.
>> Writer is very simple in Beam. It is little more than a ParDo.
>>
>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mi...@gmail.com> wrote:
>>
>>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>>> topics. Still, we'll probably use your approach for a limited number of
>>> topics until the functionality is added, thank you!
>>>
>>> Thanks,
>>> Dmitry
>>>
>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> If there are a fixed number of topics, you could partition your write
>>>> by structuring your pipeline as such:
>>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>>                         \---> KafkaIO.write(topicB)
>>>>                         \---> KafkaIO.write(...)
>>>>
>>>> There is no support currently for writing to Kafka dynamically based
>>>> upon a destination that is part of the data.
>>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>>>>
>>>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys!!
>>>>>
>>>>> I'm trying to find a way to write to a Kafka topic using
>>>>> KafkaIO.write() But I need to be able to get topic name dynamically based
>>>>> on the data received. For example, I would like to send data for one tenant
>>>>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>>> I'm coming from Flink where it's possible via
>>>>> KeyedSerializationSchema.getTargetTopic().
>>>>> Is there anything similar in KafkaIO?
>>>>>
>>>>> Thanks,
>>>>> Dmitry
>>>>>
>>>> --
>>>
>>> --
>>> Dmitry
>>>
>>
>

Re: write to a kafka topic that is set in data

Posted by Alexey Romanenko <ar...@gmail.com>.
I added a simple support of this for usual type of Kafka sink (PR: https://github.com/apache/beam/pull/6776 <https://github.com/apache/beam/pull/6776> , welcomed for review, btw :) )

In the same time, there is another, more complicated, type of sink - EOS (Exactly Once Sink). In this case the data is partitioned among fixed number of shards and it creates one ShardWriter per shard. In its order, ShardWriter depends on Kafka topic. So, seems that in case of multiple and dynamic sink topics, we need to create new ShardWriter for every new topic per shard, 

Is my assumption correct or I missed/misunderstood something? 

> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
> 
> Thanks Raghu, added starter and newbie labels to the issue.
> 
> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <rangadi@google.com <ma...@google.com>> wrote:
> It will be a good starter feature for someone interested in Beam & Kafka. Writer is very simple in Beam. It is little more than a ParDo. 
> 
> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <minaevd@gmail.com <ma...@gmail.com>> wrote:
> Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks for the suggestion, unfortunately, I don't have a fixed number of topics. Still, we'll probably use your approach for a limited number of topics until the functionality is added, thank you!
> 
> Thanks,
> Dmitry
> 
> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lcwik@google.com <ma...@google.com>> wrote:
> If there are a fixed number of topics, you could partition your write by structuring your pipeline as such:
> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>                         \---> KafkaIO.write(topicB)
>                         \---> KafkaIO.write(...)
> 
> There is no support currently for writing to Kafka dynamically based upon a destination that is part of the data.
> I filed https://issues.apache.org/jira/browse/BEAM-5798 <https://issues.apache.org/jira/browse/BEAM-5798> for the issue.
> 
> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <ma...@gmail.com> <minaevd@gmail.com <ma...@gmail.com>> wrote:
> Hi guys!!
> 
> I'm trying to find a way to write to a Kafka topic using KafkaIO.write() But I need to be able to get topic name dynamically based on the data received. For example, I would like to send data for one tenant to topic "data_feed_1" and for another tenant to "topic data_feed_999".
> I'm coming from Flink where it's possible via KeyedSerializationSchema.getTargetTopic().
> Is there anything similar in KafkaIO?
> 
> Thanks,
> Dmitry
> -- 
> --
> Dmitry
> 


Re: write to a kafka topic that is set in data

Posted by Lukasz Cwik <lc...@google.com>.
Thanks Raghu, added starter and newbie labels to the issue.

On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <ra...@google.com> wrote:

> It will be a good starter feature for someone interested in Beam & Kafka.
> Writer is very simple in Beam. It is little more than a ParDo.
>
> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mi...@gmail.com> wrote:
>
>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>> topics. Still, we'll probably use your approach for a limited number of
>> topics until the functionality is added, thank you!
>>
>> Thanks,
>> Dmitry
>>
>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> If there are a fixed number of topics, you could partition your write by
>>> structuring your pipeline as such:
>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>                         \---> KafkaIO.write(topicB)
>>>                         \---> KafkaIO.write(...)
>>>
>>> There is no support currently for writing to Kafka dynamically based
>>> upon a destination that is part of the data.
>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>>>
>>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com>
>>> wrote:
>>>
>>>> Hi guys!!
>>>>
>>>> I'm trying to find a way to write to a Kafka topic using
>>>> KafkaIO.write() But I need to be able to get topic name dynamically based
>>>> on the data received. For example, I would like to send data for one tenant
>>>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>> I'm coming from Flink where it's possible via
>>>> KeyedSerializationSchema.getTargetTopic().
>>>> Is there anything similar in KafkaIO?
>>>>
>>>> Thanks,
>>>> Dmitry
>>>>
>>> --
>>
>> --
>> Dmitry
>>
>

Re: write to a kafka topic that is set in data

Posted by Raghu Angadi <ra...@google.com>.
It will be a good starter feature for someone interested in Beam & Kafka.
Writer is very simple in Beam. It is little more than a ParDo.

On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mi...@gmail.com> wrote:

> Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks
> for the suggestion, unfortunately, I don't have a fixed number of topics.
> Still, we'll probably use your approach for a limited number of topics
> until the functionality is added, thank you!
>
> Thanks,
> Dmitry
>
> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> If there are a fixed number of topics, you could partition your write by
>> structuring your pipeline as such:
>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>                         \---> KafkaIO.write(topicB)
>>                         \---> KafkaIO.write(...)
>>
>> There is no support currently for writing to Kafka dynamically based upon
>> a destination that is part of the data.
>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>>
>> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com>
>> wrote:
>>
>>> Hi guys!!
>>>
>>> I'm trying to find a way to write to a Kafka topic using KafkaIO.write()
>>> But I need to be able to get topic name dynamically based on the data
>>> received. For example, I would like to send data for one tenant to topic
>>> "data_feed_1" and for another tenant to "topic data_feed_999".
>>> I'm coming from Flink where it's possible via
>>> KeyedSerializationSchema.getTargetTopic().
>>> Is there anything similar in KafkaIO?
>>>
>>> Thanks,
>>> Dmitry
>>>
>> --
>
> --
> Dmitry
>

Re: write to a kafka topic that is set in data

Posted by Dmitry Minaev <mi...@gmail.com>.
Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks
for the suggestion, unfortunately, I don't have a fixed number of topics.
Still, we'll probably use your approach for a limited number of topics
until the functionality is added, thank you!

Thanks,
Dmitry

On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:

> If there are a fixed number of topics, you could partition your write by
> structuring your pipeline as such:
> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>                         \---> KafkaIO.write(topicB)
>                         \---> KafkaIO.write(...)
>
> There is no support currently for writing to Kafka dynamically based upon
> a destination that is part of the data.
> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>
> On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com>
> wrote:
>
>> Hi guys!!
>>
>> I'm trying to find a way to write to a Kafka topic using KafkaIO.write()
>> But I need to be able to get topic name dynamically based on the data
>> received. For example, I would like to send data for one tenant to topic
>> "data_feed_1" and for another tenant to "topic data_feed_999".
>> I'm coming from Flink where it's possible via
>> KeyedSerializationSchema.getTargetTopic().
>> Is there anything similar in KafkaIO?
>>
>> Thanks,
>> Dmitry
>>
> --

--
Dmitry

Re: write to a kafka topic that is set in data

Posted by Lukasz Cwik <lc...@google.com>.
If there are a fixed number of topics, you could partition your write by
structuring your pipeline as such:
ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
                        \---> KafkaIO.write(topicB)
                        \---> KafkaIO.write(...)

There is no support currently for writing to Kafka dynamically based upon a
destination that is part of the data.
I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.

On Fri, Oct 19, 2018 at 2:05 PM minaevd@gmail.com <mi...@gmail.com> wrote:

> Hi guys!!
>
> I'm trying to find a way to write to a Kafka topic using KafkaIO.write()
> But I need to be able to get topic name dynamically based on the data
> received. For example, I would like to send data for one tenant to topic
> "data_feed_1" and for another tenant to "topic data_feed_999".
> I'm coming from Flink where it's possible via
> KeyedSerializationSchema.getTargetTopic().
> Is there anything similar in KafkaIO?
>
> Thanks,
> Dmitry
>