You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Neha Narkhede (JIRA)" <ji...@apache.org> on 2012/06/12 01:50:43 UTC

[jira] [Created] (KAFKA-364) Consumer re-design

Neha Narkhede created KAFKA-364:
-----------------------------------

             Summary: Consumer re-design
                 Key: KAFKA-364
                 URL: https://issues.apache.org/jira/browse/KAFKA-364
             Project: Kafka
          Issue Type: New Feature
            Reporter: Neha Narkhede
            Assignee: Neha Narkhede


We've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki -

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design

This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-364) Consumer re-design

Posted by "Ross Black (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13397486#comment-13397486 ] 

Ross Black commented on KAFKA-364:
----------------------------------

I send batched messages with compression, and use the offsets retrieved by the consumer to get exactly-once semantics (by persisting consumer state with the offsets).  When using the message set iterator, for a e.g. batch of 5 messages the offset returned for messages 1-4 is the start of the *current* batch, and the offset for message 5 is the start of the *next* batch.  My code has to wait for the offset to change from the previous message before it persists (so that my consumer state is only persisted when a batch has been completed).  To me, this feels awkward in that it is not very explicit in the API (you have to know about internals to understand the processing required).  I think it could be useful to expose a flag that indicated batch-end, or to directly expose message batches (similar to the way shallowIterator does?).

Thanks,
Ross

                
> Consumer re-design
> ------------------
>
>                 Key: KAFKA-364
>                 URL: https://issues.apache.org/jira/browse/KAFKA-364
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>
> We've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki -
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-364) Consumer re-design

Posted by "Ross Black (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13397471#comment-13397471 ] 

Ross Black commented on KAFKA-364:
----------------------------------

In the API redesign, it would be nice to somehow allow for flexible/pluggable control of the allocation of [broker:partition] from producers and to consumers when using zookeeper management.

I currently use SyncProducer and SimpleConsumer to directly control the set of [broker:partition] that a producer writes to and that a consumer reads from.

I need this for a scenario where the consumer holds some state (like a cache) on local disk.  It is expensive to discard the local state - the consumer must then instead perform a remote lookup with a very high latency. (> 5mins).  I need the partitioning performed by a producer to remain fixed until explicitly changed (the number of producers is relatively static, and each producer sends messages into a dedicated broker).  I need each consumer to fetch the same partitions unless a consumer has failed for more than some period of time (approx 5 mins) so that if it recovers quickly I have not wastefully discarded local state.

Currently if I use Producer with zookeeper, the Partitioner API allows me to partition messages, but then kafka code in the Producer controls allocation of the Partitioner result to a physical [broker:partition].  If I use Producer with fixed brokers, messages are allocated to random partitions.  If I use the high level consumer, kafka code in ZookeeperConsumerConnector controls the allocation of [broker:partition] to available consumers.

I understand if this is an over-specialised use-case to cater for.  At minimum I would like the equivalent functionality of SyncProducer and SimpleConsumer to be preserved in a public API.

Thanks,
Ross


                
> Consumer re-design
> ------------------
>
>                 Key: KAFKA-364
>                 URL: https://issues.apache.org/jira/browse/KAFKA-364
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>
> We've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki -
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-364) Consumer re-design

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13295340#comment-13295340 ] 

Neha Narkhede commented on KAFKA-364:
-------------------------------------

Throwing a +1 on "Allow the consumer to reset its offset to some arbitrary value, and then write that offset into ZK".

We're currently running into a scenario where we would like to have 100% reliability, and we're losing a few messages when a connection is broken, but there were still a few messages in the OS TCP buffers. So, we're planning on shifting the ZK offset by a few seconds "back in time" if we detect a broker has gone down, to make sure all the messages will be actually delivered to the end consumer when that broker comes back up, even if there's a small amount of overlapping messages.

Thanks,

Marcos
                
> Consumer re-design
> ------------------
>
>                 Key: KAFKA-364
>                 URL: https://issues.apache.org/jira/browse/KAFKA-364
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>
> We've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki -
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-364) Consumer re-design

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13295339#comment-13295339 ] 

Neha Narkhede commented on KAFKA-364:
-------------------------------------

I would like to throw in a couple use cases:


  - Allow the new consumer to reset its offset to either the current
  largest or smallest.  This would be a great way to restart a process that
  has fallen behind.  The only way I know how to do this today, with the
  high-level consumer, is to delete the ZK nodes manually and restart the
  consumer.
  - Allow the consumer to reset its offset to some arbitrary value, and
  then write that offset into ZK.    Kind of like the first case, but would
  make rewinding/replays much easier.

Modularity (the ability to layer the ZK infrastructure on top of the simple
interface) would be great.

thanks,
Evan
                
> Consumer re-design
> ------------------
>
>                 Key: KAFKA-364
>                 URL: https://issues.apache.org/jira/browse/KAFKA-364
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>
> We've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki -
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Comment Edited] (KAFKA-364) Consumer re-design

Posted by "Ross Black (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13397471#comment-13397471 ] 

Ross Black edited comment on KAFKA-364 at 6/20/12 1:22 PM:
-----------------------------------------------------------

In the API redesign, it would be nice to somehow allow for flexible/pluggable control of the allocation of [broker:partition] from producers and to consumers when using zookeeper management.
(I was not certain whether "Manual partition assignment" covered this - it did not mention producer partition control)

I currently use SyncProducer and SimpleConsumer to directly control the set of [broker:partition] that a producer writes to and that a consumer reads from.

I need this for a scenario where the consumer holds some state (like a cache) on local disk.  It is expensive to discard the local state - the consumer must then instead perform a remote lookup with a very high latency. (> 5mins).  I need the partitioning performed by a producer to remain fixed until explicitly changed (the number of producers is relatively static, and each producer sends messages into a dedicated broker).  I need each consumer to fetch the same partitions unless a consumer has failed for more than some period of time (approx 5 mins) so that if it recovers quickly I have not wastefully discarded local state.

Currently if I use Producer with zookeeper, the Partitioner API allows me to partition messages, but then kafka code in the Producer controls allocation of the Partitioner result to a physical [broker:partition].  If I use Producer with fixed brokers, messages are allocated to random partitions.  If I use the high level consumer, kafka code in ZookeeperConsumerConnector controls the allocation of [broker:partition] to available consumers.

I understand if this is an over-specialised use-case to cater for.  At minimum I would like the equivalent functionality of SyncProducer and SimpleConsumer to be preserved in a public API.

Thanks,
Ross


                
      was (Author: ross.black):
    In the API redesign, it would be nice to somehow allow for flexible/pluggable control of the allocation of [broker:partition] from producers and to consumers when using zookeeper management.

I currently use SyncProducer and SimpleConsumer to directly control the set of [broker:partition] that a producer writes to and that a consumer reads from.

I need this for a scenario where the consumer holds some state (like a cache) on local disk.  It is expensive to discard the local state - the consumer must then instead perform a remote lookup with a very high latency. (> 5mins).  I need the partitioning performed by a producer to remain fixed until explicitly changed (the number of producers is relatively static, and each producer sends messages into a dedicated broker).  I need each consumer to fetch the same partitions unless a consumer has failed for more than some period of time (approx 5 mins) so that if it recovers quickly I have not wastefully discarded local state.

Currently if I use Producer with zookeeper, the Partitioner API allows me to partition messages, but then kafka code in the Producer controls allocation of the Partitioner result to a physical [broker:partition].  If I use Producer with fixed brokers, messages are allocated to random partitions.  If I use the high level consumer, kafka code in ZookeeperConsumerConnector controls the allocation of [broker:partition] to available consumers.

I understand if this is an over-specialised use-case to cater for.  At minimum I would like the equivalent functionality of SyncProducer and SimpleConsumer to be preserved in a public API.

Thanks,
Ross


                  
> Consumer re-design
> ------------------
>
>                 Key: KAFKA-364
>                 URL: https://issues.apache.org/jira/browse/KAFKA-364
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>
> We've received quite a lot of feedback on the consumer side features over the past few months. Some of them are improvements to the current consumer design and some are simply new feature/API requests. I have attempted to write up the requirements that I've heard on this wiki -
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> This would involve some significant changes to the consumer APIs, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira