You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/08/16 13:59:06 UTC

[kafka] branch 2.5 updated: KAFKA-10404; Use higher poll timeout to avoid rebalance in testCoordinatorFailover (#9183)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 786b885  KAFKA-10404; Use higher poll timeout to avoid rebalance in testCoordinatorFailover (#9183)
786b885 is described below

commit 786b885d01ce55a913f413b0aa1d326d7fe966de
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sun Aug 16 14:43:21 2020 +0100

    KAFKA-10404; Use higher poll timeout to avoid rebalance in testCoordinatorFailover (#9183)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 2b0700b..01cc5b3 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -52,7 +52,9 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
   def testCoordinatorFailover(): Unit = {
     val listener = new TestConsumerReassignmentListener()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
-    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000")
+    this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
+    // Use higher poll timeout to avoid consumer leaving the group due to timeout
+    this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "15000")
     val consumer = createConsumer()
 
     consumer.subscribe(List(topic).asJava, listener)