You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/10 03:16:55 UTC
kafka git commit: KAFKA-1740 follow-up: add state checking in
handling heartbeat request;
reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk ee88dbb67 -> 9ca61d179
KAFKA-1740 follow-up: add state checking in handling heartbeat request; reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ca61d17
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ca61d17
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ca61d17
Branch: refs/heads/trunk
Commit: 9ca61d17915f09b8010fa1da5ad0285b076a96e1
Parents: ee88dbb
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jul 9 18:15:31 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 9 18:15:45 2015 -0700
----------------------------------------------------------------------
.../kafka/coordinator/ConsumerCoordinator.scala | 2 +-
.../ConsumerCoordinatorResponseTest.scala | 45 +++++++++++++++++---
2 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ca61d17/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 476973b..6c2df4c 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -210,7 +210,7 @@ class ConsumerCoordinator(val brokerId: Int,
responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
} else if (!group.has(consumerId)) {
responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else if (generationId != group.generationId) {
+ } else if (generationId != group.generationId || !group.is(Stable)) {
responseCallback(Errors.ILLEGAL_GENERATION.code)
} else {
val consumer = group.get(consumerId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ca61d17/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 3cd726d..87a5330 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -43,7 +43,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
type HeartbeatCallback = Short => Unit
val ConsumerMinSessionTimeout = 10
- val ConsumerMaxSessionTimeout = 30
+ val ConsumerMaxSessionTimeout = 100
val DefaultSessionTimeout = 20
var consumerCoordinator: ConsumerCoordinator = null
var offsetManager : OffsetManager = null
@@ -232,6 +232,30 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
}
@Test
+ def testHeartbeatDuringRebalanceCausesIllegalGeneration() {
+ val groupId = "groupId"
+ val partitionAssignmentStrategy = "range"
+
+ // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
+ val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
+ 100, isCoordinatorForGroup = true)
+ val assignedConsumerId = joinGroupResult._2
+ val initialGenerationId = joinGroupResult._3
+ val joinGroupErrorCode = joinGroupResult._4
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ // Then join with a new consumer to trigger a rebalance
+ EasyMock.reset(offsetManager)
+ sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
+ DefaultSessionTimeout, isCoordinatorForGroup = true)
+
+ // We should be in the middle of a rebalance, so the heartbeat should return illegal generation
+ EasyMock.reset(offsetManager)
+ val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
+ assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+ }
+
+ @Test
def testGenerationIdIncrementsOnRebalance() {
val groupId = "groupId"
val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
@@ -267,16 +291,25 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
(responseFuture, responseCallback)
}
- private def joinGroup(groupId: String,
- consumerId: String,
- partitionAssignmentStrategy: String,
- sessionTimeout: Int,
- isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
+ private def sendJoinGroup(groupId: String,
+ consumerId: String,
+ partitionAssignmentStrategy: String,
+ sessionTimeout: Int,
+ isCoordinatorForGroup: Boolean): Future[JoinGroupCallbackParams] = {
val (responseFuture, responseCallback) = setupJoinGroupCallback
EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
EasyMock.replay(offsetManager)
consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback)
+ responseFuture
+ }
+
+ private def joinGroup(groupId: String,
+ consumerId: String,
+ partitionAssignmentStrategy: String,
+ sessionTimeout: Int,
+ isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
+ val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}