You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/04 00:42:47 UTC

kafka git commit: KAFKA-2386; increase timeouts for transient test failure in ConsumerCoordinatorResponseTests

Repository: kafka
Updated Branches:
  refs/heads/trunk cd3dc7a5c -> 3c0963084


KAFKA-2386; increase timeouts for transient test failure in ConsumerCoordinatorResponseTests

There are two race conditions in the test case "testGenerationIdIncrementsOnRebalance." First, a delay before the second join group request can timeout the initial group and cause the generationId to unexpectedly reset. Second, a delay in the join group request handling will timeout the request itself and cause the test to fail.  This commit doesn't address these race conditions, but increases the timeouts to make them more unlikely. If the problem reoccurs, then we'll probably need a better solution.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Gwen Shapira <cs...@gmail.com>

Closes #107 from hachikuji/KAFKA-2386 and squashes the following commits:

a53460a [Jason Gustafson] KAFKA-2386; increase timeouts for transient test failure in ConsumerCoordinatorResponseTest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c096308
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c096308
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c096308

Branch: refs/heads/trunk
Commit: 3c09630844f6e70793f53a9d4f0ef562fe9d91d3
Parents: cd3dc7a
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Aug 3 15:42:33 2015 -0700
Committer: Chen Shapira <gw...@macbook-pro.gateway.sonic.net>
Committed: Mon Aug 3 15:42:33 2015 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/ConsumerCoordinatorResponseTest.scala | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3c096308/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 87a5330..058daef 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -43,8 +43,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   type HeartbeatCallback = Short => Unit
 
   val ConsumerMinSessionTimeout = 10
-  val ConsumerMaxSessionTimeout = 100
-  val DefaultSessionTimeout = 20
+  val ConsumerMaxSessionTimeout = 200
+  val DefaultSessionTimeout = 100
   var consumerCoordinator: ConsumerCoordinator = null
   var offsetManager : OffsetManager = null
 
@@ -238,7 +238,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
 
     // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
     val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
-      100, isCoordinatorForGroup = true)
+      DefaultSessionTimeout, isCoordinatorForGroup = true)
     val assignedConsumerId = joinGroupResult._2
     val initialGenerationId = joinGroupResult._3
     val joinGroupErrorCode = joinGroupResult._4
@@ -310,7 +310,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
                         sessionTimeout: Int,
                         isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
     val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup)
-    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+    // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+    Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
   }
 
   private def heartbeat(groupId: String,