You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stig Rohde Døssing <st...@gmail.com> on 2017/11/03 18:50:14 UTC

Manual offset control and topic compaction

Hi,

I'm working on the Kafka connector for Apache Storm, which pulls messages
from Kafka and emits them into a Storm topology. The connector uses manual
offset control since message processing happens asynchronously to pulling
messages from Kafka, and we hit an issue a while back related to topic
compaction. I think we can solve it, but I'd like confirmation that the way
we're going about it isn't wrong.

The connector keeps track of which offsets have been emitted into the
topology, along with other information such as how many times they've been
retried. When an offset should be retried the connector fetches the message
from Kafka again (it is not kept in-memory once emitted). We only clean up
the state for an offset once it is fully processed.

The issue we hit is that if topic compaction is enabled, we need to know
that the offset is no longer available so we can delete the corresponding
state. Would the approach described here https://issues.apache.org/
jira/browse/STORM-2546?focusedCommentId=16151172&page=com.atlassian.jira.
plugin.system.issuetabpanels:comment-tabpanel#comment-16151172 be
reasonable for this, or is there another way to check if an offset has been
deleted?

Thanks.

Re: Manual offset control and topic compaction

Posted by Stig Rohde Døssing <st...@gmail.com>.
I should probably have been a little more clear what I'm asking, so here's
an example.

Let's say that I have a consumer on a topic partition, and I'm doing manual
commits. Because I'm doing manual commits and the messages are processed
asynchronously from the consumer poll loop, I keep track of which offsets I
have received from Kafka, and mark them as done once they have been
processed. I don't commit offset X until all messages X-1, X-2... have been
marked as done.

Say that I've previously received offsets 0-10 and realize that I need to
retry offset 2. I seek the consumer to offset 2 on that partition. I poll
once and check the resulting records, and find that the earliest received
message has offset 5. Is it now correct to assume that offset 2-4 must have
been compacted away, so I should forget about those messages, stop trying
to fetch them and mark them as done in my offset tracker?

2017-11-03 19:50 GMT+01:00 Stig Rohde Døssing <st...@gmail.com>:

> Hi,
>
> I'm working on the Kafka connector for Apache Storm, which pulls messages
> from Kafka and emits them into a Storm topology. The connector uses manual
> offset control since message processing happens asynchronously to pulling
> messages from Kafka, and we hit an issue a while back related to topic
> compaction. I think we can solve it, but I'd like confirmation that the way
> we're going about it isn't wrong.
>
> The connector keeps track of which offsets have been emitted into the
> topology, along with other information such as how many times they've been
> retried. When an offset should be retried the connector fetches the message
> from Kafka again (it is not kept in-memory once emitted). We only clean up
> the state for an offset once it is fully processed.
>
> The issue we hit is that if topic compaction is enabled, we need to know
> that the offset is no longer available so we can delete the corresponding
> state. Would the approach described here https://issues.apache.org/jira
> /browse/STORM-2546?focusedCommentId=16151172&page=com.
> atlassian.jira.plugin.system.issuetabpanels:comment-
> tabpanel#comment-16151172 be reasonable for this, or is there another way
> to check if an offset has been deleted?
>
> Thanks.
>