You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dan Dutrow (JIRA)" <ji...@apache.org> on 2015/08/13 22:34:46 UTC

[jira] [Comment Edited] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream

    [ https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695887#comment-14695887 ] 

Dan Dutrow edited comment on SPARK-6249 at 8/13/15 8:34 PM:
------------------------------------------------------------

I have read many of the tickets, documentation, and source code relating to this subject, but haven't found a solid answer to what appears to be the common question of how to reinitialize a Kafka Direct API consumer after a restart. I have a rapidly evolving application that uses checkpoints and the like for 24/7 operations. However, when we deploy a new jar, we have to blow away the checkpoint because otherwise the changes won't take affect cleanly. We have an approach to maintain the data between restarts, but not the Kafka offset, and doing the Kafka bookkeeping in our application requires a deeper knowledge and proficiency to make sure that is done correctly. Not being familiar with the ZooKeeper API, I was hoping for a Spark-abstracted approach where I can configure something in the API or with a parameter like: "auto.offset.reset" -> "remember" to do the best it can to resume processing at the previous offset location (for that consumer group). Short of that, a link to a blog, document, or example in GitHub that describes the ZK API to accomplish this functionality in detail would be greatly appreciated.

On a related topic, it would be awesome if the Streaming Statistics page could be updated so that you can review the message rates while using the Kafka Direct API. If making that work is another Zookeeper trick, some example code would be immensely helpful.

http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tt24246.html


was (Author: dutrow):
I have read many of the tickets, documentation, and source code relating to this subject, but haven't found a solid answer to what appears to be the common question of how to reinitialize a Kafka Direct API consumer after a restart. I have a rapidly evolving application that uses checkpoints and the like for 24/7 operations. However, when we deploy a new jar, we have to blow away the checkpoint because otherwise the changes won't take affect cleanly. We have an approach to maintain the data between restarts, but not the Kafka offset, and doing the Kafka bookkeeping in our application requires a deeper knowledge and proficiency to make sure that is done correctly. Not being familiar with the ZooKeeper API, I was hoping for a Spark-abstracted approach where I can configure something in the API or with a parameter like: "auto.offset.reset" -> "remember" to do the best it can to resume processing at the previous offset location (for that consumer group). Short of that, a link to a blog, document, or example in GitHub that describes the ZK API to accomplish this functionality in detail would be greatly appreciated.

On a related topic, it would be awesome if the Streaming Statistics page could be updated so that you can review the message rates while using the Kafka Direct API. If making that work is another Zookeeper trick, some example code would be immensely helpful.

> Get Kafka offsets from consumer group in ZK when using direct stream
> --------------------------------------------------------------------
>
>                 Key: SPARK-6249
>                 URL: https://issues.apache.org/jira/browse/SPARK-6249
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>            Reporter: Tathagata Das
>
> This is the proposal. 
> The simpler direct API (the one that does not take explicit offsets) can be modified to also pick up the initial offset from ZK if group.id is specified. This is exactly similar to how we find the latest or earliest offset in that API, just that instead of latest/earliest offset of the topic we want to find the offset from the consumer group. The group offsets is ZK is not used at all for any further processing and restarting, so the exactly-once semantics is not broken. 
> The use case where this is useful is simplified code upgrade. If the user wants to upgrade the code, he/she can the context stop gracefully which will ensure the ZK consumer group offset will be updated with the last offsets processed. Then the new code is started (not restarted from checkpoint) can pickup  the consumer group offset from ZK and continue where the previous code had left off. 
> Without the functionality of picking up consumer group offsets to start (that is, currently) the only way to do this is for the users to save the offsets somewhere (file, database, etc.) and manage the offsets themselves. I just want to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org