You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jason Rosenberg (JIRA)" <ji...@apache.org> on 2013/10/03 17:45:04 UTC

[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

    [ https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13785291#comment-13785291 ] 

Jason Rosenberg commented on KAFKA-966:
---------------------------------------

I think a nice api would be to have an enhanced version of 'auto.commit.enable', which allows you to 'mark for commit' a message after you are done processing it.  So, you'd call 'markForCommit()' once you have successfully processed a message (and this would imply of course, that you are marking all previous messages up to that offset).  So, when a subsequent invocation of commitOffsets() runs, it will only commit up to the last offset 'markedForCommit'.  This would apply both to auto-initiated commits (if auto.commit.enable is on) and to manually invocations of commitOffsets().

So, this would be a change in the PartitionTopicInfo class, I'd expect.  So, in addition to tracking 'fetchedOffset' and 'consumedOffset', it could track 'markedForCommitOffset' (maybe a better name :))

Then, when commitOffsets() runs, if 'reliable.auto.commit.enable' (or whatever we call it) is enabled, then it would use the 'markedForCommitOffset' rather than the 'consumedOffset' when committing.

Thoughts?

> Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-966
>                 URL: https://issues.apache.org/jira/browse/KAFKA-966
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Chris Curtin
>            Assignee: Neha Narkhede
>            Priority: Minor
>
> Enhancement request.
> The high level consumer is very close to handling a lot of situations a 'typical' client would need. Except for when the message received from Kafka is valid, but the business logic that wants to consume it has a problem.
> For example if I want to write the value to a MongoDB or Cassandra database and the database is not available. I won't know until I go to do the write that the database isn't available, but by then it is too late to NOT read the message from Kafka. Thus if I call shutdown() to stop reading, that message is lost since the offset Kafka writes to ZooKeeper is the next offset.
> Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the next offset to read for this partition to this message when I start up again. And if there are any messages in the BlockingQueue for other partitions, find the lowest # and use it for that partitions offset since I haven't consumed them yet.
> Thus I can cleanly shutdown my processing, resolve whatever the issue is and restart the process.
> Another idea might be to allow a 'peek' into the next message and if I succeed in writing to the database call 'next' to remove it from the queue. 
> I understand this won't deal with a 'kill -9' or hard failure of the JVM leading to the latest offsets not being written to ZooKeeper but it addresses a likely common scenario for consumers. Nor will it add true transactional support since the ZK update could fail.



--
This message was sent by Atlassian JIRA
(v6.1#6144)