You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jason Gustafson <ja...@confluent.io> on 2015/06/05 03:21:07 UTC

Re: Review Request 33196: Patch for KAFKA-2123


> On May 31, 2015, 9:10 p.m., Guozhang Wang wrote:
> > Thanks for the explanation Ewen. I agree that a delayed scheduler would be a good fit here, but was originally more concerned about the complexity we introduced by adding two queues (one for delayed actions and another for handling order-preserving commits). After thinking about it a bit more, I feel the complexity mainly comes from the place where we need to make blocking calls, that involve also triggering the delayed tasks' poll:
> > 
> > 
> > 1. Consumer.poll() where we will "block" until the timeout has elapsed.
> > 2. Consumer.awaitMetadataUpdate() where we block until metadata refreshed.
> > 3. CommitOffsetHandler.poll() where we block until this request completed via client.completeAll().
> > 
> > Basically we need to remember each of those places and make sure delayed tasks gets polled also while we are blocking. So I am wondering if we could refactor this patch a bit as:
> > 
> > 1. Move DelayedTask / DelayedTaskQueue class to o.a.k.common.
> > 
> > 2. Add the delayedTask to KafkaClient with a new API along side with send(); more specifically we can:
> >   a. Rename send() to scheduleOnce(request), which queue up the given request to be sent in the next poll";
> >   b. Add scheduleRecurring(request, interval), which "triggers scheduleOnce every interval".
> >   c. In poll(), check whether we should schedule a request via ScheduleOnce as we did in this patch.
> > 
> > As for the commitOffsetRequests queue and the #.retry config, if we are expecting in the future some more requests like the sync commit will be added to the consumer, we may want to make them more general, for example making them as ConsumerConfig.RETRIES_CONFIG like ProducerConfig.RETRIES_CONFIG, and Queue<RequestCompletionHandler> scheduledRequests.
> 
> Guozhang Wang wrote:
>     Also I am curious how KAFKA-2168 will be leveraging this patch to add the wakeup call, could you elaborate a bit?

My patch for KAFKA-2168 (if accepted) may actually make this simpler since it centralizes all of the blocking calls in KafkaConsumer. Then it might not be necessary to push the task queue into the networking layer. 

I wonder, however, if we can do without the task queue by just letting asynchronous commit requests fail fast? If the consumer is using auto-commits, then we will retry the commit again at the next interval anyway. And if the user is the one doing the commit and they actually care about the result, then we provide a callback which will force them to handle errors regardless of whether we are retrying underneath the covers. (There's also just the fact that having to deal with a queue of pending commits feels odd given that it's really only the most recent ones that you care about.)


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33196/#review85920
-----------------------------------------------------------


On May 29, 2015, 6:11 p.m., Ewen Cheslack-Postava wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33196/
> -----------------------------------------------------------
> 
> (Updated May 29, 2015, 6:11 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123: Add queuing of offset commit requests.
> 
> 
> KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for commit retries, and simplify auto commit by using delayed tasks.
> 
> 
> KAFKA-2123: Make synchronous offset commits wait for previous requests to finish in order.
> 
> 
> KAFKA-2123: Remove redundant calls to ensureNotClosed
> 
> 
> KAFKA-2123: Address review comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 
> 
> Diff: https://reviews.apache.org/r/33196/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>