You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Romero <js...@gmail.com> on 2023/04/13 18:15:29 UTC

Doubts about kafka connector

Hi Apache Beam team. I have been working on a POC for the company i'm
working for and Im using apache beam kafka connector to read from kafka
topic and write into other kafka topic. The source and target topic have 3
partitions and is compulsory keep ordering by certain message keys.
Regarding it I have two questions:

1. How can I write an specific message to an specific kafka partition.
2. How can we commit the message to the source topic only and only when the
pipeline had processed the message.

I looking forward and hope you can help me with these doubts.

Re: Doubts about kafka connector

Posted by Pavel Solomin <p....@gmail.com>.
I might be missing something, but it sounds like you want to keep ordering
of events which belong to the same id, and not really about having
hard-coded partition number for writing.

If so, would it be enough to create a keyed PCollection with id as a key +
use KafkaIO.writeRecords() ?
I am concluding only by quick check of the javadoc, will try this out when
I have time.

https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.html#writeRecords--

https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-K-V-

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Thu, 13 Apr 2023 at 20:07, John Casey via user <us...@beam.apache.org>
wrote:

> Unfortunately, we don't have a feature to publish to a specific partition.
> We tried to design with Kafka conventions in mind, and I don't believe we
> plan to add this functionality.
>
> On Thu, Apr 13, 2023 at 3:03 PM Juan Romero <js...@gmail.com> wrote:
>
>> Hi John. Thanks for your response!.
>>
>> Point 2 is clear now for me. I was reading lot of documentation about it
>> and i only wanted to confirm with you.
>>
>> Regarding point 1 I know the drawbacks , we have to avoid hot partitions
>> (For this purpouse we can define a hash function that evenly distributes
>> the message across the three partitions ). However in our specific case we
>> are using a tool that ingest the data from transactional databases (This
>> tool allow us to define the partitioner function) and we want to persist
>> the last state of an specific database row in a target OLAP database. For
>> example we could have the following events which indicates that an specific
>> row was updated:
>>
>> ("Id": "123456 ", "name": "Juan", "last_name": "Romero", "timestamp":
>> "13-04-2023 13:44:00.100", "Event": "Update")
>> ("Id": "123456 ", "name": "Juan", "last_name": "Romero2", "timestamp":
>> "13-04-2023 13:44:00.200", "Event": "Update")
>> ("Id": "123456 ", "name": "Juan", "last_name": "Romero3", "timestamp":
>> "13-04-2023 13:44:00.300", "Event": "Update")
>>
>> Suppose all this events are being processed by different workers (worker
>> is a consumer and producer at the same time  in this case, which at the end
>> push the data in other target topic). In the source we can guarantee that
>> the updatings with the same id go to the same partitions (so far the
>> messages would be ordered ), the problem is that we will have many
>> consumers and producers in the middle and in this case we can read the
>> messages in order from the first topic (because the same key always got to
>> the same partition) in the chain but after it we have to process the
>> message and push the message in other topic that have 3 partitions (like i
>> said before), in this case the producer will push the row updatings between
>> the different partitions and we can potencially lose the original messages
>> order, because we are pushing the different updates referring an specific
>> row across multiple partions.
>>
>> Let me know If you can understand my case. If you want I can give you a
>> clear diagram to explain to you.
>>
>> The general idea is that we need push the last valievent in the
>> transactional database in the target and in the middle we have
>> different beam pipelines which act like consumer and producer.
>>
>> Thnaks!!
>>
>>
>>
>>
>>
>> El jue, 13 abr 2023 a las 13:23, John Casey via user (<
>> user@beam.apache.org>) escribió:
>>
>>> Hi Juan,
>>>
>>> Under normal usage, Kafka will maintain ordering within a partition
>>> without any extra work by you.
>>>
>>> For 2, you can use .commitOffsetsInFinalize to only commit back to the
>>> source topic once the pipeline has persisted the message, at which point it
>>> may not be fully processed, but it is guaranteed that it will be processed.
>>>
>>> For 1, I would generally recommend against trying to write to a specific
>>> partition. This undermines the load balancing that Kafka enables with
>>> partitioning. I'd recommend separate topics instead. We don't support
>>> writing to specific partitions in the current IO.
>>>
>>> John
>>>
>>> On Thu, Apr 13, 2023 at 2:15 PM Juan Romero <js...@gmail.com> wrote:
>>>
>>>> Hi Apache Beam team. I have been working on a POC for the company i'm
>>>> working for and Im using apache beam kafka connector to read from kafka
>>>> topic and write into other kafka topic. The source and target topic have 3
>>>> partitions and is compulsory keep ordering by certain message keys.
>>>> Regarding it I have two questions:
>>>>
>>>> 1. How can I write an specific message to an specific kafka partition.
>>>> 2. How can we commit the message to the source topic only and only when
>>>> the pipeline had processed the message.
>>>>
>>>> I looking forward and hope you can help me with these doubts.
>>>>
>>>

Re: Doubts about kafka connector

Posted by John Casey via user <us...@beam.apache.org>.
Unfortunately, we don't have a feature to publish to a specific partition.
We tried to design with Kafka conventions in mind, and I don't believe we
plan to add this functionality.

On Thu, Apr 13, 2023 at 3:03 PM Juan Romero <js...@gmail.com> wrote:

> Hi John. Thanks for your response!.
>
> Point 2 is clear now for me. I was reading lot of documentation about it
> and i only wanted to confirm with you.
>
> Regarding point 1 I know the drawbacks , we have to avoid hot partitions
> (For this purpouse we can define a hash function that evenly distributes
> the message across the three partitions ). However in our specific case we
> are using a tool that ingest the data from transactional databases (This
> tool allow us to define the partitioner function) and we want to persist
> the last state of an specific database row in a target OLAP database. For
> example we could have the following events which indicates that an specific
> row was updated:
>
> ("Id": "123456 ", "name": "Juan", "last_name": "Romero", "timestamp":
> "13-04-2023 13:44:00.100", "Event": "Update")
> ("Id": "123456 ", "name": "Juan", "last_name": "Romero2", "timestamp":
> "13-04-2023 13:44:00.200", "Event": "Update")
> ("Id": "123456 ", "name": "Juan", "last_name": "Romero3", "timestamp":
> "13-04-2023 13:44:00.300", "Event": "Update")
>
> Suppose all this events are being processed by different workers (worker
> is a consumer and producer at the same time  in this case, which at the end
> push the data in other target topic). In the source we can guarantee that
> the updatings with the same id go to the same partitions (so far the
> messages would be ordered ), the problem is that we will have many
> consumers and producers in the middle and in this case we can read the
> messages in order from the first topic (because the same key always got to
> the same partition) in the chain but after it we have to process the
> message and push the message in other topic that have 3 partitions (like i
> said before), in this case the producer will push the row updatings between
> the different partitions and we can potencially lose the original messages
> order, because we are pushing the different updates referring an specific
> row across multiple partions.
>
> Let me know If you can understand my case. If you want I can give you a
> clear diagram to explain to you.
>
> The general idea is that we need push the last valievent in the
> transactional database in the target and in the middle we have
> different beam pipelines which act like consumer and producer.
>
> Thnaks!!
>
>
>
>
>
> El jue, 13 abr 2023 a las 13:23, John Casey via user (<
> user@beam.apache.org>) escribió:
>
>> Hi Juan,
>>
>> Under normal usage, Kafka will maintain ordering within a partition
>> without any extra work by you.
>>
>> For 2, you can use .commitOffsetsInFinalize to only commit back to the
>> source topic once the pipeline has persisted the message, at which point it
>> may not be fully processed, but it is guaranteed that it will be processed.
>>
>> For 1, I would generally recommend against trying to write to a specific
>> partition. This undermines the load balancing that Kafka enables with
>> partitioning. I'd recommend separate topics instead. We don't support
>> writing to specific partitions in the current IO.
>>
>> John
>>
>> On Thu, Apr 13, 2023 at 2:15 PM Juan Romero <js...@gmail.com> wrote:
>>
>>> Hi Apache Beam team. I have been working on a POC for the company i'm
>>> working for and Im using apache beam kafka connector to read from kafka
>>> topic and write into other kafka topic. The source and target topic have 3
>>> partitions and is compulsory keep ordering by certain message keys.
>>> Regarding it I have two questions:
>>>
>>> 1. How can I write an specific message to an specific kafka partition.
>>> 2. How can we commit the message to the source topic only and only when
>>> the pipeline had processed the message.
>>>
>>> I looking forward and hope you can help me with these doubts.
>>>
>>

Re: Doubts about kafka connector

Posted by Juan Romero <js...@gmail.com>.
Hi John. Thanks for your response!.

Point 2 is clear now for me. I was reading lot of documentation about it
and i only wanted to confirm with you.

Regarding point 1 I know the drawbacks , we have to avoid hot partitions
(For this purpouse we can define a hash function that evenly distributes
the message across the three partitions ). However in our specific case we
are using a tool that ingest the data from transactional databases (This
tool allow us to define the partitioner function) and we want to persist
the last state of an specific database row in a target OLAP database. For
example we could have the following events which indicates that an specific
row was updated:

("Id": "123456 ", "name": "Juan", "last_name": "Romero", "timestamp":
"13-04-2023 13:44:00.100", "Event": "Update")
("Id": "123456 ", "name": "Juan", "last_name": "Romero2", "timestamp":
"13-04-2023 13:44:00.200", "Event": "Update")
("Id": "123456 ", "name": "Juan", "last_name": "Romero3", "timestamp":
"13-04-2023 13:44:00.300", "Event": "Update")

Suppose all this events are being processed by different workers (worker is
a consumer and producer at the same time  in this case, which at the end
push the data in other target topic). In the source we can guarantee that
the updatings with the same id go to the same partitions (so far the
messages would be ordered ), the problem is that we will have many
consumers and producers in the middle and in this case we can read the
messages in order from the first topic (because the same key always got to
the same partition) in the chain but after it we have to process the
message and push the message in other topic that have 3 partitions (like i
said before), in this case the producer will push the row updatings between
the different partitions and we can potencially lose the original messages
order, because we are pushing the different updates referring an specific
row across multiple partions.

Let me know If you can understand my case. If you want I can give you a
clear diagram to explain to you.

The general idea is that we need push the last valievent in the
transactional database in the target and in the middle we have
different beam pipelines which act like consumer and producer.

Thnaks!!





El jue, 13 abr 2023 a las 13:23, John Casey via user (<us...@beam.apache.org>)
escribió:

> Hi Juan,
>
> Under normal usage, Kafka will maintain ordering within a partition
> without any extra work by you.
>
> For 2, you can use .commitOffsetsInFinalize to only commit back to the
> source topic once the pipeline has persisted the message, at which point it
> may not be fully processed, but it is guaranteed that it will be processed.
>
> For 1, I would generally recommend against trying to write to a specific
> partition. This undermines the load balancing that Kafka enables with
> partitioning. I'd recommend separate topics instead. We don't support
> writing to specific partitions in the current IO.
>
> John
>
> On Thu, Apr 13, 2023 at 2:15 PM Juan Romero <js...@gmail.com> wrote:
>
>> Hi Apache Beam team. I have been working on a POC for the company i'm
>> working for and Im using apache beam kafka connector to read from kafka
>> topic and write into other kafka topic. The source and target topic have 3
>> partitions and is compulsory keep ordering by certain message keys.
>> Regarding it I have two questions:
>>
>> 1. How can I write an specific message to an specific kafka partition.
>> 2. How can we commit the message to the source topic only and only when
>> the pipeline had processed the message.
>>
>> I looking forward and hope you can help me with these doubts.
>>
>

Re: Doubts about kafka connector

Posted by John Casey via user <us...@beam.apache.org>.
Hi Juan,

Under normal usage, Kafka will maintain ordering within a partition without
any extra work by you.

For 2, you can use .commitOffsetsInFinalize to only commit back to the
source topic once the pipeline has persisted the message, at which point it
may not be fully processed, but it is guaranteed that it will be processed.

For 1, I would generally recommend against trying to write to a specific
partition. This undermines the load balancing that Kafka enables with
partitioning. I'd recommend separate topics instead. We don't support
writing to specific partitions in the current IO.

John

On Thu, Apr 13, 2023 at 2:15 PM Juan Romero <js...@gmail.com> wrote:

> Hi Apache Beam team. I have been working on a POC for the company i'm
> working for and Im using apache beam kafka connector to read from kafka
> topic and write into other kafka topic. The source and target topic have 3
> partitions and is compulsory keep ordering by certain message keys.
> Regarding it I have two questions:
>
> 1. How can I write an specific message to an specific kafka partition.
> 2. How can we commit the message to the source topic only and only when
> the pipeline had processed the message.
>
> I looking forward and hope you can help me with these doubts.
>