You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Stu (JIRA)" <ji...@apache.org> on 2017/12/18 21:06:00 UTC

[jira] [Comment Edited] (KAFKA-2359) New consumer - partitions auto assigned only on poll

    [ https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295659#comment-16295659 ] 

Stu edited comment on KAFKA-2359 at 12/18/17 9:05 PM:
------------------------------------------------------

Granted, I may be using Kafka in a place where another queue would be better, but since all our other use cases are already using Kafka (and more appropriately so). I ran across this issue (in 0.10.1 !) when I created a service that needed to push an item onto a shared queue, and then pop it off when later asked for the "next" work item. Something like this:

POST /item  -> produce record onto a topic
GET /item/next -> pop the next item to work on

I want a shared, HA/LB queue so the service can be a very simple, stateless service that I scale out (stateless in that all the state is in Kafka).

The problem is that the simplest way to do this is:

POST -> produce( item )
GET -> poll() w/ max.poll.records = 1, auto-assignment

This doesn't work if you start the service, POST an item, and then GET an item. (After the first GET, everything is fine).

I've repeatedly tried this, and the first item posted is never consumed. Once I post another item, everything works fine.

Based on what I can see, this is because the consumer only ever receives new items produced *after* the first assignment happens.
So until poll() is called, any POST, and the items it produces to the topic, are not made available to my consumer. 

I've confirmed this theory by doing an initial poll() in the constructor of the class that is hooked up to the resources for my service. 
(And, yes, I always call flush() after producing an item).
This does fix the issue, but then I have to do some special handling, checking if that initial poll() already retrieve an item (so I don't lose it).

Note that, true, this only happens if you have no services running, start one, produce an item, and try to get an item. If you have multiple services running, one of the other services *should* pick up the produced item.

But it does illustrate the use case for having some sort of special "ensurePartitionAssignment" call that ensures the consumer is up and ready to go.
Whether it's a valid use case is debatable, I suppose. I'm sure someone will tell me to just "use Rabbit" or "use ZeroMQ" or something instead.



was (Author: stuatasf):
Granted, I may be using Kafka in a place where another queue would be better, but since all our other use cases are already using Kafka (and more appropriately so). I ran across this issue (in 0.10.1 !) when I created a service that needed to push an item onto a shared queue, and then pop it off when later asked for the "next" work item. Something like this:

POST /item  -> produce record onto a topic
GET /item/next -> pop the next item to work on

I want a shared, HA/LB queue so the service can be a very simple, stateless service that I scale out (stateless in that all the state is in Kafka).

The problem is that the simplest way to do this is:

POST -> produce( item )
GET -> poll() w/ max.poll.records = 1, auto-assignment

This doesn't work if you start the service, POST an item, and then GET an item. (After the first GET, everything is fine).

I've repeatedly tried this, and the first item posted is never consumed. Once I post another item, everything works fine.

Based on what I can see, this is because the consumer only ever receives new items produced *after* the first assignment happens.
So until poll() is called, and POST, and the items is produces to the topic, are not made available to my consumer. 

I've confirmed this theory by doing an initial poll() in the constructor of the class that is hooked up to the resources for my service. 
(And, yes, I always call flush() after producing an item).
This does fix the issue, but then I have to do some special handling, checking if that initial poll() already retrieve an item (so I don't lose it).

Note that, true, this only happens if you have no services running, start one, produce an item, and try to get an item. If you have multiple services running, one of the other services *should* pick up the produced item.

But it does illustrate the use case for having some sort of special "ensurePartitionAssignment" call that ensures the consumer is up and ready to go.
Whether it's a valid use case is debatable, I suppose. I'm sure someone will tell me to just "use Rabbit" or "use ZeroMQ" or something instead.


> New consumer - partitions auto assigned only on poll
> ----------------------------------------------------
>
>                 Key: KAFKA-2359
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2359
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.0
>            Reporter: Stevo Slavic
>            Priority: Minor
>
> In the new consumer I encountered unexpected behavior. After constructing {{KafkaConsumer}} instance with configured consumer rebalance callback handler, and subscribing to a topic with "consumer.subscribe(topic)", retrieving subscriptions would return empty set and callback handler would not get called (no partitions ever assigned or revoked), no matter how long instance was up.
> Then I found by inspecting {{KafkaConsumer}} code that partition assignment will only be triggered on first {{poll}}, since {{pollOnce}} has:
> {noformat}
> // ensure we have partitions assigned if we expect to
> if (subscriptions.partitionsAutoAssigned())
>     coordinator.ensurePartitionAssignment();
> {noformat}
> I'm proposing to fix this by including same {{ensurePartitionAssignment}} fragment in {{KafkaConsumer.subscriptions}} accessor as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)