You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/09/22 23:28:59 UTC

kafka git commit: KAFKA-2557: separate REBALANCE_IN_PROGRESS and ILLEGAL_GENERATION error codes

Repository: kafka
Updated Branches:
  refs/heads/trunk 4833d8a8c -> 2837fa5a3


KAFKA-2557: separate REBALANCE_IN_PROGRESS and ILLEGAL_GENERATION error codes

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Jiangjie Qin, Jason Gustafson, Guozhang Wang

Closes #222 from onurkaraman/KAFKA-2557


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2837fa5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2837fa5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2837fa5a

Branch: refs/heads/trunk
Commit: 2837fa5a34420ea1ede90901ded6e9f3c94a3e7b
Parents: 4833d8a
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue Sep 22 14:32:28 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Sep 22 14:32:28 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/internals/Coordinator.java   | 4 ++++
 .../src/main/java/org/apache/kafka/common/protocol/Errors.java | 4 +++-
 .../src/main/scala/kafka/coordinator/ConsumerCoordinator.scala | 4 +++-
 .../kafka/coordinator/ConsumerCoordinatorResponseTest.scala    | 6 +++---
 4 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 374eceb..5efe300 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -642,6 +642,10 @@ public final class Coordinator {
                 log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
                 coordinatorDead();
                 future.raise(Errors.forCode(error));
+            } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
+                log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
+                subscriptions.needReassignment();
+                future.raise(Errors.REBALANCE_IN_PROGRESS);
             } else if (error == Errors.ILLEGAL_GENERATION.code()) {
                 log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
                 subscriptions.needReassignment();

http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index b3415c3..220132f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -88,7 +88,9 @@ public enum Errors {
             new ApiException("Some of the committing partitions are not assigned the committer")),
     INVALID_COMMIT_OFFSET_SIZE(28,
             new ApiException("The committing offset data size is not valid")),
-    AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized."));
+    AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
+    REBALANCE_IN_PROGRESS(30,
+            new ApiException("The group is rebalancing, so a rejoin is needed."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 1bceb43..64e21c5 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -210,8 +210,10 @@ class ConsumerCoordinator(val brokerId: Int,
             responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
           } else if (!group.has(consumerId)) {
             responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
-          } else if (generationId != group.generationId || !group.is(Stable)) {
+          } else if (generationId != group.generationId) {
             responseCallback(Errors.ILLEGAL_GENERATION.code)
+          } else if (!group.is(Stable)) {
+            responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
           } else {
             val consumer = group.get(consumerId)
             completeAndScheduleNextHeartbeatExpiration(group, consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2837fa5a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index 42ffdde..07f7326 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -232,7 +232,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
-  def testHeartbeatDuringRebalanceCausesIllegalGeneration() {
+  def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
     val groupId = "groupId"
     val partitionAssignmentStrategy = "range"
 
@@ -249,10 +249,10 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
       DefaultSessionTimeout, isCoordinatorForGroup = true)
 
-    // We should be in the middle of a rebalance, so the heartbeat should return illegal generation
+    // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
     EasyMock.reset(offsetManager)
     val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
-    assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
   }
 
   @Test