You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/11 16:49:10 UTC
[kafka] branch trunk updated: KAFKA-8487: Only request re-join on
REBALANCE_IN_PROGRESS in CommitOffsetResponse (#6894)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bebcbe3 KAFKA-8487: Only request re-join on REBALANCE_IN_PROGRESS in CommitOffsetResponse (#6894)
bebcbe3 is described below
commit bebcbe3a049f78c4184404f2dfb8b4150233856e
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Jun 11 09:48:43 2019 -0700
KAFKA-8487: Only request re-join on REBALANCE_IN_PROGRESS in CommitOffsetResponse (#6894)
Plus some minor cleanups on AbstractCoordinator.
Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/AbstractCoordinator.java | 10 +--
.../consumer/internals/ConsumerCoordinator.java | 16 ++++-
.../internals/ConsumerCoordinatorTest.java | 34 ++++++++-
.../kafka/coordinator/group/GroupCoordinator.scala | 84 +++++++++++++---------
.../coordinator/group/GroupCoordinatorTest.scala | 46 ++++++++++--
5 files changed, 141 insertions(+), 49 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 73563fd..30277b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -555,7 +555,7 @@ public abstract class AbstractCoordinator implements Closeable {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
- future.raise(Errors.UNKNOWN_MEMBER_ID);
+ future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
@@ -592,7 +592,7 @@ public abstract class AbstractCoordinator implements Closeable {
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
- future.raise(Errors.MEMBER_ID_REQUIRED);
+ future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
@@ -940,18 +940,18 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("Attempt to heartbeat failed since group is rebalancing");
requestRejoin();
- future.raise(Errors.REBALANCE_IN_PROGRESS);
+ future.raise(error);
} else if (error == Errors.ILLEGAL_GENERATION) {
log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
resetGeneration();
- future.raise(Errors.ILLEGAL_GENERATION);
+ future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
resetGeneration();
- future.raise(Errors.UNKNOWN_MEMBER_ID);
+ future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 64bf17d..bacb960 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -881,10 +881,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
return;
+ } else if (error == Errors.REBALANCE_IN_PROGRESS) {
+ /* Consumer never tries to commit offset in between join-group and sync-group,
+ * and hence on broker-side it is not expected to see a commit offset request
+ * during CompletingRebalance phase; if it ever happens then broker would return
+ * this error. In this case we should just treat as a fatal CommitFailed exception.
+ * However, we do not need to reset generations and just request re-join, such that
+ * if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
+ */
+ requestRejoin();
+ future.raise(new CommitFailedException());
+ return;
} else if (error == Errors.UNKNOWN_MEMBER_ID
- || error == Errors.ILLEGAL_GENERATION
- || error == Errors.REBALANCE_IN_PROGRESS) {
- // need to re-join group
+ || error == Errors.ILLEGAL_GENERATION) {
+ // need to reset generation and re-join group
resetGeneration();
future.raise(new CommitFailedException());
return;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 6f62bbf..9595639 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1678,15 +1678,43 @@ public class ConsumerCoordinatorTest {
new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE));
}
- @Test(expected = CommitFailedException.class)
+ @Test
public void testCommitOffsetRebalanceInProgress() {
// we cannot retry if a rebalance occurs before the commit completed
+ final String consumerId = "leader";
+
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+
+ // ensure metadata is up-to-date for leader
+ client.updateMetadata(metadataResponse);
+
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+ // normal join group
+ Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+ partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
+ client.prepareResponse(body -> {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
+ }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+
+ AbstractCoordinator.Generation expectedGeneration = new AbstractCoordinator.Generation(1, consumerId, partitionAssignor.name());
+ assertFalse(coordinator.rejoinNeededOrPending());
+ assertEquals(expectedGeneration, coordinator.generation());
+
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
- coordinator.commitOffsetsSync(singletonMap(t1p,
- new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE));
+
+ assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p,
+ new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE)));
+
+ assertTrue(coordinator.rejoinNeededOrPending());
+ assertEquals(expectedGeneration, coordinator.generation());
}
@Test(expected = KafkaException.class)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 5d40e9b..e601d34 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -180,38 +180,48 @@ class GroupCoordinator(val brokerId: Int,
if (group.hasStaticMember(groupInstanceId)) {
val oldMemberId = group.getStaticMemberId(groupInstanceId)
- if (group.is(Stable)) {
- info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while " +
- s"old member $oldMemberId will be removed. No rebalance will be triggered.")
-
- val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
-
- // Heartbeat of old member id will expire without affection since the group no longer contains that member id.
- // New heartbeat shall be scheduled with new member id.
- completeAndScheduleNextHeartbeatExpiration(group, oldMember)
-
- responseCallback(JoinGroupResult(
- members = if (group.isLeader(newMemberId)) {
- group.currentMemberMetadata
- } else {
- List.empty
- },
- memberId = newMemberId,
- generationId = group.generationId,
- subProtocol = group.protocolOrNull,
- leaderId = group.leaderOrNull,
- error = Errors.NONE))
- } else {
- val knownStaticMember = group.get(oldMemberId)
- updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
+ group.currentState match {
+ case Stable =>
+ info(s"Static member $groupInstanceId with unknown member id rejoins group ${group.groupId} " +
+ s"in ${group.currentState} state. Assigning new member id $newMemberId, while old member $oldMemberId " +
+ "will be removed. No rebalance will be triggered.")
+
+ val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
+
+ // Heartbeat of old member id will expire without affection since the group no longer contains that member id.
+ // New heartbeat shall be scheduled with new member id.
+ completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+
+ responseCallback(JoinGroupResult(
+ members = if (group.isLeader(newMemberId)) {
+ group.currentMemberMetadata
+ } else {
+ List.empty
+ },
+ memberId = newMemberId,
+ generationId = group.generationId,
+ subProtocol = group.protocolOrNull,
+ leaderId = group.leaderOrNull,
+ error = Errors.NONE))
+
+ case _ =>
+ info(s"Static member $groupInstanceId with unkonwn member id rejoins group ${group.groupId} " +
+ s"in ${group.currentState} state. Update its membership with the pre-registered old member id $oldMemberId.")
+
+ val knownStaticMember = group.get(oldMemberId)
+ updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
}
} else if (requireKnownMemberId) {
// If member id required (dynamic membership), register the member in the pending member list
// and send back a response to call for another join group request with allocated member id.
- group.addPendingMember(newMemberId)
- addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
- responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
+ debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
+ s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
+ group.addPendingMember(newMemberId)
+ addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
+ responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
+ debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
+ s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
@@ -613,16 +623,26 @@ class GroupCoordinator(val brokerId: Int,
// The group is only using Kafka to store offsets.
// Also, for transactional offset commits we don't need to validate group membership and the generation.
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
- } else if (group.is(CompletingRebalance)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
} else if (!group.has(memberId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
} else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
} else {
- val member = group.get(memberId)
- completeAndScheduleNextHeartbeatExpiration(group, member)
- groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+ group.currentState match {
+ case Stable | PreparingRebalance =>
+ // During PreparingRebalance phase, we still allow a commit request since we rely
+ // on heartbeat response to eventually notify the rebalance in progress signal to the consumer
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+
+ case CompletingRebalance =>
+ // We should not receive a commit request if the group has not completed rebalance;
+ // but since the consumer's member.id and generation is valid, it means it has received
+ // the latest group generation information from the JoinResponse.
+ // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
+ responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
+ }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 5a1b20a..cd1ebc5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1176,25 +1176,25 @@ class GroupCoordinatorTest {
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols,
rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
- val assignedConsumerId = joinGroupResult.memberId
+ val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
- val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+ val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupError)
timer.advanceClock(sessionTimeout / 2)
EasyMock.reset(replicaManager)
- val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, Map(tp -> offset))
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
timer.advanceClock(sessionTimeout / 2 + 100)
EasyMock.reset(replicaManager)
- val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+ val heartbeatResult = heartbeat(groupId, assignedMemberId, 1)
assertEquals(Errors.NONE, heartbeatResult)
}
@@ -2159,6 +2159,40 @@ class GroupCoordinatorTest {
}
@Test
+ def testCommitOffsetInCompletingRebalanceFromUnknownMemberId() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val tp = new TopicPartition("topic", 0)
+ val offset = offsetAndMetadata(0)
+
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupError = joinGroupResult.error
+ assertEquals(Errors.NONE, joinGroupError)
+
+ EasyMock.reset(replicaManager)
+ val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, commitOffsetResult(tp))
+ }
+
+ @Test
+ def testCommitOffsetInCompletingRebalanceFromIllegalGeneration() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val tp = new TopicPartition("topic", 0)
+ val offset = offsetAndMetadata(0)
+
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupError = joinGroupResult.error
+ assertEquals(Errors.NONE, joinGroupError)
+
+ EasyMock.reset(replicaManager)
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId + 1, Map(tp -> offset))
+ assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
+ }
+
+ @Test
def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
// First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
@@ -2666,7 +2700,7 @@ class GroupCoordinatorTest {
}
private def commitOffsets(groupId: String,
- consumerId: String,
+ memberId: String,
generationId: Int,
offsets: Map[TopicPartition, OffsetAndMetadata],
groupInstanceId: Option[String] = None): CommitOffsetCallbackParams = {
@@ -2692,7 +2726,7 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
- groupCoordinator.handleCommitOffsets(groupId, consumerId, groupInstanceId, generationId, offsets, responseCallback)
+ groupCoordinator.handleCommitOffsets(groupId, memberId, groupInstanceId, generationId, offsets, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}