You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vincent Maurin <vi...@gmail.com> on 2023/05/30 15:41:48 UTC

Consuming an entire partition with control messages

Hello !

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.

On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.

When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :
* my current (maybe) working solution is to loop over "poll" until
poll is not returning any messages anymore
* I tried to do more something based on the end offests, the checking
the consumer position, but with control messages at the end of the
partition, I am running into an issue where position is one below end
offsets, and doesn't go further

I had a quick look to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
but it is a bit hard to figure out what is going on here

Best regards,
Vincent

RE: Consuming an entire partition with control messages

Posted by mi...@votecgroup.com.
Hi Team,

Greetings,

We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC IT SERVICE PARTNERSHIP"  "IT SERVICE OUTSOURCING" " "PARTNER SERVICE SUBCONTRACTING"

We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White Label Model.

Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee payroll, Help partner to get profit more than 50% on each project.. ..We really mean it.

We are already working with platinum partner like NTT DATA, NEC Singapore, Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.

Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB kailash@votecgroup.com | Partnership In charge |

Let us know your availability this week OR Next week??

-----Original Message-----
From: Matthias J. Sax [mailto:mjsax@apache.org] 
Sent: 27 July 2023 22:05
To: users@kafka.apache.org
Subject: Re: Consuming an entire partition with control messages

Well, `kafka-consumer-group.sh` can only display the difference between "committed offset" and "end offset". It cannot know what the "right" 
offset to be committed is. It's really the responsibility of the consumers to commit correctly.

-Matthias

On 7/27/23 1:03 AM, Vincent Maurin wrote:
> Thank you Matthias for your answer, I open an issue on the aiokafka 
> project as follow up, let's see how we can resolve it there
> https://github.com/aio-libs/aiokafka/issues/911
> 
> As mentioned in the issue, some tools like kafka-consumer-groups.sh 
> also display a lag of "1" in this kind of situation
> 
> Best regards,
> 
> Vincent
> 
> On 13/06/2023 17:27, Matthias J. Sax wrote:
>> Sounds like a bug in aiokafka library to me.
>>
>> If the last message in a topic partition is a tx-marker, the consumer 
>> should step over it, and report the correct position after the marker.
>>
>> The official KafkaConsumer (ie, the Java one), does the exact same thing.
>>
>>
>> -Matthias
>>
>> On 5/30/23 8:41 AM, Vincent Maurin wrote:
>>> Hello !
>>>
>>> I am working on an exactly once stream processors in Python, using 
>>> aiokafka client library. My program stores a state in memory, that 
>>> is recovered from a changelog topic, like in kafka streams.
>>>
>>> On each processing loop, I am consuming messages, producing messages 
>>> to an output topics and to my changelog topic, within a transaction.
>>>
>>> When I need to restart a runner, to restore the state in memory, I 
>>> have a routine consuming the changelog topic from the beginning to 
>>> the "end" with a read_commited isolation level. Here I am struggling 
>>> to define when to stop my recovery :
>>> * my current (maybe) working solution is to loop over "poll" until 
>>> poll is not returning any messages anymore
>>> * I tried to do more something based on the end offests, the 
>>> checking the consumer position, but with control messages at the end 
>>> of the partition, I am running into an issue where position is one 
>>> below end offsets, and doesn't go further
>>>
>>> I had a quick look to
>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org
>>> /apache/kafka/streams/processor/internals/StoreChangelogReader.java
>>> but it is a bit hard to figure out what is going on here
>>>
>>> Best regards,
>>> Vincent


Re: Consuming an entire partition with control messages

Posted by "Matthias J. Sax" <mj...@apache.org>.
Well, `kafka-consumer-group.sh` can only display the difference between 
"committed offset" and "end offset". It cannot know what the "right" 
offset to be committed is. It's really the responsibility of the 
consumers to commit correctly.

-Matthias

On 7/27/23 1:03 AM, Vincent Maurin wrote:
> Thank you Matthias for your answer, I open an issue on the aiokafka 
> project as follow up, let's see how we can resolve it there 
> https://github.com/aio-libs/aiokafka/issues/911
> 
> As mentioned in the issue, some tools like kafka-consumer-groups.sh also 
> display a lag of "1" in this kind of situation
> 
> Best regards,
> 
> Vincent
> 
> On 13/06/2023 17:27, Matthias J. Sax wrote:
>> Sounds like a bug in aiokafka library to me.
>>
>> If the last message in a topic partition is a tx-marker, the consumer 
>> should step over it, and report the correct position after the marker.
>>
>> The official KafkaConsumer (ie, the Java one), does the exact same thing.
>>
>>
>> -Matthias
>>
>> On 5/30/23 8:41 AM, Vincent Maurin wrote:
>>> Hello !
>>>
>>> I am working on an exactly once stream processors in Python, using
>>> aiokafka client library. My program stores a state in memory, that is
>>> recovered from a changelog topic, like in kafka streams.
>>>
>>> On each processing loop, I am consuming messages, producing messages
>>> to an output topics and to my changelog topic, within a transaction.
>>>
>>> When I need to restart a runner, to restore the state in memory, I
>>> have a routine consuming the changelog topic from the beginning to the
>>> "end" with a read_commited isolation level. Here I am struggling to
>>> define when to stop my recovery :
>>> * my current (maybe) working solution is to loop over "poll" until
>>> poll is not returning any messages anymore
>>> * I tried to do more something based on the end offests, the checking
>>> the consumer position, but with control messages at the end of the
>>> partition, I am running into an issue where position is one below end
>>> offsets, and doesn't go further
>>>
>>> I had a quick look to
>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
>>> but it is a bit hard to figure out what is going on here
>>>
>>> Best regards,
>>> Vincent

Re: Consuming an entire partition with control messages

Posted by Vincent Maurin <vi...@gmail.com>.
Thank you Matthias for your answer, I open an issue on the aiokafka 
project as follow up, let's see how we can resolve it there 
https://github.com/aio-libs/aiokafka/issues/911

As mentioned in the issue, some tools like kafka-consumer-groups.sh also 
display a lag of "1" in this kind of situation

Best regards,

Vincent

On 13/06/2023 17:27, Matthias J. Sax wrote:
> Sounds like a bug in aiokafka library to me.
>
> If the last message in a topic partition is a tx-marker, the consumer 
> should step over it, and report the correct position after the marker.
>
> The official KafkaConsumer (ie, the Java one), does the exact same thing.
>
>
> -Matthias
>
> On 5/30/23 8:41 AM, Vincent Maurin wrote:
>> Hello !
>>
>> I am working on an exactly once stream processors in Python, using
>> aiokafka client library. My program stores a state in memory, that is
>> recovered from a changelog topic, like in kafka streams.
>>
>> On each processing loop, I am consuming messages, producing messages
>> to an output topics and to my changelog topic, within a transaction.
>>
>> When I need to restart a runner, to restore the state in memory, I
>> have a routine consuming the changelog topic from the beginning to the
>> "end" with a read_commited isolation level. Here I am struggling to
>> define when to stop my recovery :
>> * my current (maybe) working solution is to loop over "poll" until
>> poll is not returning any messages anymore
>> * I tried to do more something based on the end offests, the checking
>> the consumer position, but with control messages at the end of the
>> partition, I am running into an issue where position is one below end
>> offsets, and doesn't go further
>>
>> I had a quick look to
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java 
>>
>> but it is a bit hard to figure out what is going on here
>>
>> Best regards,
>> Vincent

Re: Consuming an entire partition with control messages

Posted by "Matthias J. Sax" <mj...@apache.org>.
Sounds like a bug in aiokafka library to me.

If the last message in a topic partition is a tx-marker, the consumer 
should step over it, and report the correct position after the marker.

The official KafkaConsumer (ie, the Java one), does the exact same thing.


-Matthias

On 5/30/23 8:41 AM, Vincent Maurin wrote:
> Hello !
> 
> I am working on an exactly once stream processors in Python, using
> aiokafka client library. My program stores a state in memory, that is
> recovered from a changelog topic, like in kafka streams.
> 
> On each processing loop, I am consuming messages, producing messages
> to an output topics and to my changelog topic, within a transaction.
> 
> When I need to restart a runner, to restore the state in memory, I
> have a routine consuming the changelog topic from the beginning to the
> "end" with a read_commited isolation level. Here I am struggling to
> define when to stop my recovery :
> * my current (maybe) working solution is to loop over "poll" until
> poll is not returning any messages anymore
> * I tried to do more something based on the end offests, the checking
> the consumer position, but with control messages at the end of the
> partition, I am running into an issue where position is one below end
> offsets, and doesn't go further
> 
> I had a quick look to
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> but it is a bit hard to figure out what is going on here
> 
> Best regards,
> Vincent