You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/10 03:16:55 UTC

kafka git commit: KAFKA-1740 follow-up: add state checking in handling heartbeat request; reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk ee88dbb67 -> 9ca61d179


KAFKA-1740 follow-up: add state checking in handling heartbeat request; reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ca61d17
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ca61d17
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ca61d17

Branch: refs/heads/trunk
Commit: 9ca61d17915f09b8010fa1da5ad0285b076a96e1
Parents: ee88dbb
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jul 9 18:15:31 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 9 18:15:45 2015 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/ConsumerCoordinator.scala |  2 +-
 .../ConsumerCoordinatorResponseTest.scala       | 45 +++++++++++++++++---
 2 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9ca61d17/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 476973b..6c2df4c 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -210,7 +210,7 @@ class ConsumerCoordinator(val brokerId: Int,
             responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
           } else if (!group.has(consumerId)) {
             responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-          } else if (generationId != group.generationId) {
+          } else if (generationId != group.generationId || !group.is(Stable)) {
             responseCallback(Errors.ILLEGAL_GENERATION.code)
           } else {
             val consumer = group.get(consumerId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ca61d17/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 3cd726d..87a5330 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -43,7 +43,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   type HeartbeatCallback = Short => Unit
 
   val ConsumerMinSessionTimeout = 10
-  val ConsumerMaxSessionTimeout = 30
+  val ConsumerMaxSessionTimeout = 100
   val DefaultSessionTimeout = 20
   var consumerCoordinator: ConsumerCoordinator = null
   var offsetManager : OffsetManager = null
@@ -232,6 +232,30 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testHeartbeatDuringRebalanceCausesIllegalGeneration() {
+    val groupId = "groupId"
+    val partitionAssignmentStrategy = "range"
+
+    // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
+      100, isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult._2
+    val initialGenerationId = joinGroupResult._3
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    // Then join with a new consumer to trigger a rebalance
+    EasyMock.reset(offsetManager)
+    sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
+      DefaultSessionTimeout, isCoordinatorForGroup = true)
+
+    // We should be in the middle of a rebalance, so the heartbeat should return illegal generation
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
+    assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+  }
+
+  @Test
   def testGenerationIdIncrementsOnRebalance() {
     val groupId = "groupId"
     val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
@@ -267,16 +291,25 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     (responseFuture, responseCallback)
   }
 
-  private def joinGroup(groupId: String,
-                        consumerId: String,
-                        partitionAssignmentStrategy: String,
-                        sessionTimeout: Int,
-                        isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
+  private def sendJoinGroup(groupId: String,
+                            consumerId: String,
+                            partitionAssignmentStrategy: String,
+                            sessionTimeout: Int,
+                            isCoordinatorForGroup: Boolean): Future[JoinGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupJoinGroupCallback
     EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
     EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
     EasyMock.replay(offsetManager)
     consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback)
+    responseFuture
+  }
+
+  private def joinGroup(groupId: String,
+                        consumerId: String,
+                        partitionAssignmentStrategy: String,
+                        sessionTimeout: Int,
+                        isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
+    val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }