You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/09 08:20:12 UTC

[GitHub] [kafka] ocadaruma commented on pull request #7539: KAFKA-6968: Adds calls to listener on rebalance of MockConsumer

ocadaruma commented on pull request #7539:
URL: https://github.com/apache/kafka/pull/7539#issuecomment-626127671


   Yeah, rebalance listener is called upon `onJoinComplete/onJoinPrepare`, and it happens as part of `KafkaConsumer#poll` as stated in `ConsumerRebalanceListener`'s javadoc. (https://github.com/apache/kafka/blob/2.5.0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L48)
   
   as like below:
   ```
   KafkaConsumer#poll() =>
     KafkaConsumer#updateAssignmentMetadataIfNeeded() =>
       ConsumerCoordinator#poll() =>
         AbstractCoordinator#ensureActiveGroup() =>
           AbstractCoordinator#joinGroupIfNeeded() =>
             ConsumerCoordinator#onJoinPrepare()
   ```
   
   So, if we respect real KafkaConsumer behavior, rebalance listener invocation should be happen in `poll()` rather than `MockConsumer#rebalance()`, which might be called separately from poll().
   
   However, after reconsideration, though my initial intention was like above, I have different thought about this now.
   
   Even if we call rebalance listener in `MockConsumer#rebalance()` (as @efgpinto 's original patch), still we can simulate poll()-driven rebalance invocation (described above) as like below:
   ```java
   @Test
   public void testRebalance() {
       MockConsumer<String, String> consumer = new MockConsumer<>();
   
       consumer.subscribe(Arrays.asList("topic"), new SomeRebalanceListener());
   
       // simulate rebalance happens after first poll()
       consumer.scheduleNopPollTask();
       consumer.schedulePollTask(() => {
           consumer.rebalance(newAssignment);
       });
   
       consumer.poll(100L);
       consumer.poll(100L); // => rebalance will be triggered
   }
   ```
   
   So reverting my patch (https://github.com/apache/kafka/pull/7539/commits/4b62aa09eacfea70daa55151ee0928720c772e48) would be fine.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org