You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/09/22 23:28:59 UTC
kafka git commit: KAFKA-2557: separate REBALANCE_IN_PROGRESS and
ILLEGAL_GENERATION error codes
Repository: kafka
Updated Branches:
refs/heads/trunk 4833d8a8c -> 2837fa5a3
KAFKA-2557: separate REBALANCE_IN_PROGRESS and ILLEGAL_GENERATION error codes
Author: Onur Karaman <ok...@linkedin.com>
Reviewers: Jiangjie Qin, Jason Gustafson, Guozhang Wang
Closes #222 from onurkaraman/KAFKA-2557
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2837fa5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2837fa5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2837fa5a
Branch: refs/heads/trunk
Commit: 2837fa5a34420ea1ede90901ded6e9f3c94a3e7b
Parents: 4833d8a
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue Sep 22 14:32:28 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Sep 22 14:32:28 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/internals/Coordinator.java | 4 ++++
.../src/main/java/org/apache/kafka/common/protocol/Errors.java | 4 +++-
.../src/main/scala/kafka/coordinator/ConsumerCoordinator.scala | 4 +++-
.../kafka/coordinator/ConsumerCoordinatorResponseTest.scala | 6 +++---
4 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 374eceb..5efe300 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -642,6 +642,10 @@ public final class Coordinator {
log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
coordinatorDead();
future.raise(Errors.forCode(error));
+ } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
+ log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
+ subscriptions.needReassignment();
+ future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION.code()) {
log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
subscriptions.needReassignment();
http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index b3415c3..220132f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -88,7 +88,9 @@ public enum Errors {
new ApiException("Some of the committing partitions are not assigned the committer")),
INVALID_COMMIT_OFFSET_SIZE(28,
new ApiException("The committing offset data size is not valid")),
- AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized."));
+ AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
+ REBALANCE_IN_PROGRESS(30,
+ new ApiException("The group is rebalancing, so a rejoin is needed."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 1bceb43..64e21c5 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -210,8 +210,10 @@ class ConsumerCoordinator(val brokerId: Int,
responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
} else if (!group.has(consumerId)) {
responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else if (generationId != group.generationId || !group.is(Stable)) {
+ } else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION.code)
+ } else if (!group.is(Stable)) {
+ responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
} else {
val consumer = group.get(consumerId)
completeAndScheduleNextHeartbeatExpiration(group, consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 42ffdde..07f7326 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -232,7 +232,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
}
@Test
- def testHeartbeatDuringRebalanceCausesIllegalGeneration() {
+ def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
val groupId = "groupId"
val partitionAssignmentStrategy = "range"
@@ -249,10 +249,10 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
DefaultSessionTimeout, isCoordinatorForGroup = true)
- // We should be in the middle of a rebalance, so the heartbeat should return illegal generation
+ // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
EasyMock.reset(offsetManager)
val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
- assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+ assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
}
@Test