You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/07 22:29:48 UTC
[kafka] branch 2.5 updated: KAFKA-9801: Still trigger rebalance
when static member joins in CompletingRebalance phase (#8405)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 146811a KAFKA-9801: Still trigger rebalance when static member joins in CompletingRebalance phase (#8405)
146811a is described below
commit 146811ab2964860adc8139cd3518719b2a377612
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Apr 7 15:29:15 2020 -0700
KAFKA-9801: Still trigger rebalance when static member joins in CompletingRebalance phase (#8405)
Fix the direct cause of the observed issue on the client side: when heartbeat getting errors and resetting generation, we only need to set it to UNJOINED when it was not already in REBALANCING; otherwise, the join-group handler would throw the retriable UnjoinedGroupException to force the consumer to re-send join group unnecessarily.
Fix the root cause of the issue on the broker side: we should still trigger rebalance when static member joins in CompletingRebalance phase; otherwise the member.ids would be changed when the assignment is received from the leader, hence causing the new member.id's assignment to be empty.
Reviewers: Boyang Chen <bo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/AbstractCoordinator.java | 39 +++++---
.../consumer/internals/ConsumerCoordinator.java | 7 ++
.../clients/consumer/internals/Heartbeat.java | 35 +++++---
.../internals/AbstractCoordinatorTest.java | 46 ++++++++++
.../kafka/coordinator/group/GroupCoordinator.scala | 100 ++++++++++++---------
.../kafka/coordinator/group/GroupMetadata.scala | 8 +-
.../kafka/api/PlaintextConsumerTest.scala | 7 +-
.../coordinator/group/GroupCoordinatorTest.scala | 26 +++---
8 files changed, 183 insertions(+), 85 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a9d1fd6..58071ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -451,8 +451,9 @@ public abstract class AbstractCoordinator implements Closeable {
return false;
}
} else {
- resetJoinGroupFuture();
final RuntimeException exception = future.exception();
+ log.info("Join group failed with {}", exception.toString());
+ resetJoinGroupFuture();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
@@ -891,17 +892,26 @@ public abstract class AbstractCoordinator implements Closeable {
}
private synchronized void resetGeneration() {
+ this.rejoinNeeded = true;
this.generation = Generation.NO_GENERATION;
- resetStateAndRejoin();
}
synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
- log.debug("Resetting generation after encountering {} from {} response", error, api);
+ log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);
+
+ // only reset the state to un-joined when it is not already in rebalancing
+ if (state != MemberState.REBALANCING)
+ state = MemberState.UNJOINED;
+
resetGeneration();
}
synchronized void resetGenerationOnLeaveGroup() {
log.debug("Resetting generation due to consumer pro-actively leaving the group");
+
+ // always set the state to un-joined
+ state = MemberState.UNJOINED;
+
resetGeneration();
}
@@ -1008,7 +1018,8 @@ public abstract class AbstractCoordinator implements Closeable {
// visible for testing
synchronized RequestFuture<Void> sendHeartbeatRequest() {
- log.debug("Sending Heartbeat request to coordinator {}", coordinator);
+ log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}",
+ generation.generationId, generation.memberId, coordinator);
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(new HeartbeatRequestData()
.setGroupId(rebalanceConfig.groupId)
@@ -1016,10 +1027,16 @@ public abstract class AbstractCoordinator implements Closeable {
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
return client.send(coordinator, requestBuilder)
- .compose(new HeartbeatResponseHandler());
+ .compose(new HeartbeatResponseHandler(generation));
}
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
+ private final Generation sentGeneration;
+
+ private HeartbeatResponseHandler(final Generation generation) {
+ this.sentGeneration = generation;
+ }
+
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatSensor.record(response.requestLatencyMs());
@@ -1029,7 +1046,7 @@ public abstract class AbstractCoordinator implements Closeable {
future.complete(null);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
- log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid.",
+ log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid",
coordinator());
markCoordinatorUnknown();
future.raise(error);
@@ -1038,14 +1055,14 @@ public abstract class AbstractCoordinator implements Closeable {
requestRejoin();
future.raise(error);
} else if (error == Errors.ILLEGAL_GENERATION) {
- log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
+ log.info("Attempt to heartbeat failed since generation {} is not current", sentGeneration.generationId);
resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
- log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
+ log.info("Attempt to heartbeat failed for since member id {} is not valid.", sentGeneration.memberId);
resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
@@ -1287,8 +1304,8 @@ public abstract class AbstractCoordinator implements Closeable {
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
-
- sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
+ final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
+ heartbeatFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
@@ -1426,6 +1443,4 @@ public abstract class AbstractCoordinator implements Closeable {
final boolean hasValidMemberId() {
return generation != Generation.NO_GENERATION && generation.hasMemberId();
}
-
-
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8faa34a..d4a399f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -346,6 +346,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+ // should at least encode the short version
+ if (assignmentBuffer.remaining() < 2)
+ throw new IllegalStateException("There is insufficient bytes available to read assignment from the sync-group response (" +
+ "actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; " +
+ "it is possible that the leader's assign function is buggy and did not return any assignment for this member, " +
+ "or because static member is configured and the protocol is buggy hence did not get the assignment for this member");
+
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 4d19ef4..2e9a5ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -32,6 +32,7 @@ public final class Heartbeat {
private final Timer pollTimer;
private volatile long lastHeartbeatSend = 0L;
+ private volatile boolean heartbeatInFlight = false;
public Heartbeat(GroupRebalanceConfig config,
Time time) {
@@ -56,60 +57,66 @@ public final class Heartbeat {
pollTimer.reset(maxPollIntervalMs);
}
- public void sentHeartbeat(long now) {
- this.lastHeartbeatSend = now;
+ boolean hasInflight() {
+ return heartbeatInFlight;
+ }
+
+ void sentHeartbeat(long now) {
+ lastHeartbeatSend = now;
+ heartbeatInFlight = true;
update(now);
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
}
- public void failHeartbeat() {
+ void failHeartbeat() {
update(time.milliseconds());
+ heartbeatInFlight = false;
heartbeatTimer.reset(rebalanceConfig.retryBackoffMs);
}
- public void receiveHeartbeat() {
+ void receiveHeartbeat() {
update(time.milliseconds());
+ heartbeatInFlight = false;
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
}
- public boolean shouldHeartbeat(long now) {
+ boolean shouldHeartbeat(long now) {
update(now);
return heartbeatTimer.isExpired();
}
- public long lastHeartbeatSend() {
+ long lastHeartbeatSend() {
return this.lastHeartbeatSend;
}
- public long timeToNextHeartbeat(long now) {
+ long timeToNextHeartbeat(long now) {
update(now);
return heartbeatTimer.remainingMs();
}
- public boolean sessionTimeoutExpired(long now) {
+ boolean sessionTimeoutExpired(long now) {
update(now);
return sessionTimer.isExpired();
}
- public void resetTimeouts() {
+ void resetTimeouts() {
update(time.milliseconds());
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
pollTimer.reset(maxPollIntervalMs);
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
}
- public void resetSessionTimeout() {
+ void resetSessionTimeout() {
update(time.milliseconds());
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
}
- public boolean pollTimeoutExpired(long now) {
+ boolean pollTimeoutExpired(long now) {
update(now);
return pollTimer.isExpired();
}
- public long lastPollTime() {
+ long lastPollTime() {
return pollTimer.currentTimeMs();
}
-
-}
\ No newline at end of file
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index e2315cd..eb88e3a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -466,6 +466,52 @@ public class AbstractCoordinatorTest {
}
@Test
+ public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws InterruptedException {
+ setupCoordinator();
+ mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+ final int generation = 1;
+
+ mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
+ mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+ coordinator.ensureActiveGroup();
+
+ final AbstractCoordinator.Generation currGen = coordinator.generation();
+
+ // let the heartbeat request to send out a request
+ mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+
+ TestUtils.waitForCondition(() -> coordinator.heartbeat().hasInflight(), 2000,
+ "The heartbeat request was not sent in time after 2000ms elapsed");
+
+ assertTrue(coordinator.heartbeat().hasInflight());
+
+ // set the client to re-join group
+ mockClient.respond(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
+
+ coordinator.requestRejoin();
+
+ TestUtils.waitForCondition(() -> {
+ coordinator.ensureActiveGroup(new MockTime(1L).timer(100L));
+ return !coordinator.heartbeat().hasInflight();
+ },
+ 2000,
+ "The heartbeat response was not been received in time after 2000ms elapsed");
+
+ assertFalse(coordinator.heartbeat().hasInflight());
+
+ // the generation should be reset but the rebalance should still proceed
+ assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+
+ mockClient.respond(joinGroupFollowerResponse(generation, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
+ mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+ coordinator.ensureActiveGroup();
+ assertEquals(currGen, coordinator.generation());
+ }
+
+ @Test
public void testHeartbeatRequestWithFencedInstanceIdException() throws InterruptedException {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index bf8f991..06f07d3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -202,41 +202,7 @@ class GroupCoordinator(val brokerId: Int,
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
if (group.hasStaticMember(groupInstanceId)) {
- val oldMemberId = group.getStaticMemberId(groupInstanceId)
- info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " +
- s"old member id $oldMemberId will be removed.")
-
- val currentLeader = group.leaderOrNull
- val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
- // Heartbeat of old member id will expire without effect since the group no longer contains that member id.
- // New heartbeat shall be scheduled with new member id.
- completeAndScheduleNextHeartbeatExpiration(group, member)
-
- val knownStaticMember = group.get(newMemberId)
- group.updateMember(knownStaticMember, protocols, responseCallback)
-
- group.currentState match {
- case Stable | CompletingRebalance =>
- info(s"Static member joins during ${group.currentState} stage will not trigger rebalance.")
- group.maybeInvokeJoinCallback(member, JoinGroupResult(
- members = List.empty,
- memberId = newMemberId,
- generationId = group.generationId,
- protocolType = group.protocolType,
- protocolName = group.protocolName,
- // We want to avoid current leader performing trivial assignment while the group
- // is in stable/awaiting sync stage, because the new assignment in leader's next sync call
- // won't be broadcast by a stable/awaiting sync group. This could be guaranteed by
- // always returning the old leader id so that the current leader won't assume itself
- // as a leader based on the returned message, since the new member.id won't match
- // returned leader id, therefore no assignment will be performed.
- leaderId = currentLeader,
- error = Errors.NONE))
- case Empty | Dead =>
- throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " +
- s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.")
- case PreparingRebalance =>
- }
+ updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
} else if (requireKnownMemberId) {
// If member id required (dynamic membership), register the member in the pending member list
// and send back a response to call for another join group request with allocated member id.
@@ -246,7 +212,7 @@ class GroupCoordinator(val brokerId: Int,
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
- debug(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
+ info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
@@ -287,7 +253,7 @@ class GroupCoordinator(val brokerId: Int,
}
} else {
val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
- if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+ if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
// given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId) || groupInstanceIdNotFound) {
@@ -397,7 +363,7 @@ class GroupCoordinator(val brokerId: Int,
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE))
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+ } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "sync-group")) {
responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId)) {
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
@@ -483,7 +449,7 @@ class GroupCoordinator(val brokerId: Int,
val memberId = leavingMember.memberId
val groupInstanceId = Option(leavingMember.groupInstanceId)
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID
- && group.isStaticMemberFenced(memberId, groupInstanceId)) {
+ && group.isStaticMemberFenced(memberId, groupInstanceId, "leave-group")) {
memberLeaveError(leavingMember, Errors.FENCED_INSTANCE_ID)
} else if (group.isPendingMember(memberId)) {
if (groupInstanceId.isDefined) {
@@ -640,7 +606,7 @@ class GroupCoordinator(val brokerId: Int,
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+ } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat")) {
responseCallback(Errors.FENCED_INSTANCE_ID)
} else if (!group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID)
@@ -739,7 +705,7 @@ class GroupCoordinator(val brokerId: Int,
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+ } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "txn-commit-offsets")) {
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
// Enforce member id when it is set.
@@ -766,7 +732,7 @@ class GroupCoordinator(val brokerId: Int,
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
- } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
+ } else if (group.isStaticMemberFenced(memberId, groupInstanceId, "commit-offsets")) {
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
} else if (generationId < 0 && group.is(Empty)) {
// The group is only using Kafka to store offsets.
@@ -1025,7 +991,55 @@ class GroupCoordinator(val brokerId: Int,
} else {
group.removePendingMember(memberId)
}
- maybePrepareRebalance(group, s"Adding new member $memberId with group instanceid $groupInstanceId")
+ maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
+ }
+
+ private def updateStaticMemberAndRebalance(group: GroupMetadata,
+ newMemberId: String,
+ groupInstanceId: Option[String],
+ protocols: List[(String, Array[Byte])],
+ responseCallback: JoinCallback): Unit = {
+ val oldMemberId = group.getStaticMemberId(groupInstanceId)
+ info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " +
+ s"old member id $oldMemberId will be removed.")
+
+ val currentLeader = group.leaderOrNull
+ val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
+ // Heartbeat of old member id will expire without effect since the group no longer contains that member id.
+ // New heartbeat shall be scheduled with new member id.
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+
+ val knownStaticMember = group.get(newMemberId)
+ group.updateMember(knownStaticMember, protocols, responseCallback)
+
+ group.currentState match {
+ case Stable =>
+ info(s"Static member joins during Stable stage will not trigger rebalance.")
+ group.maybeInvokeJoinCallback(member, JoinGroupResult(
+ members = List.empty,
+ memberId = newMemberId,
+ generationId = group.generationId,
+ protocolType = group.protocolType,
+ protocolName = group.protocolName,
+ // We want to avoid current leader performing trivial assignment while the group
+ // is in stable stage, because the new assignment in leader's next sync call
+ // won't be broadcast by a stable group. This could be guaranteed by
+ // always returning the old leader id so that the current leader won't assume itself
+ // as a leader based on the returned message, since the new member.id won't match
+ // returned leader id, therefore no assignment will be performed.
+ leaderId = currentLeader,
+ error = Errors.NONE))
+ case CompletingRebalance =>
+ // if the group is in after-sync stage, upon getting a new join-group of a known static member
+ // we should still trigger a new rebalance, since the old member may already be sent to the leader
+ // for assignment, and hence when the assignment gets back there would be a mismatch of the old member id
+ // with the new replaced member id. As a result the new member id would not get any assignment.
+ prepareRebalance(group, s"Updating metadata for static member ${member.memberId} with instance id $groupInstanceId")
+ case Empty | Dead =>
+ throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " +
+ s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.")
+ case PreparingRebalance =>
+ }
}
private def updateMemberAndRebalance(group: GroupMetadata,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index dde76c3..ec35784 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -385,11 +385,13 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
* 2. group stored member.id doesn't match with given member.id
*/
def isStaticMemberFenced(memberId: String,
- groupInstanceId: Option[String]): Boolean = {
+ groupInstanceId: Option[String],
+ operation: String): Boolean = {
if (hasStaticMember(groupInstanceId)
&& getStaticMemberId(groupInstanceId) != memberId) {
- error(s"given member.id $memberId is identified as a known static member ${groupInstanceId.get}," +
- s"but not matching the expected member.id ${getStaticMemberId(groupInstanceId)}")
+ error(s"given member.id $memberId is identified as a known static member ${groupInstanceId.get}, " +
+ s"but not matching the expected member.id ${getStaticMemberId(groupInstanceId)} during $operation, will " +
+ s"respond with instance fenced error")
true
} else
false
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9df468a..e4ab403 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -173,7 +173,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testMaxPollIntervalMs(): Unit = {
- this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000.toString)
+ this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString)
@@ -187,9 +187,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(1, listener.callsToAssigned)
assertEquals(0, listener.callsToRevoked)
- Thread.sleep(3500)
+ // after we extend longer than max.poll a rebalance should be triggered
+ // NOTE we need to have a relatively much larger value than max.poll to let heartbeat expired for sure
+ Thread.sleep(3000)
- // we should fall out of the group and need to rebalance
awaitRebalance(consumer, listener)
assertEquals(2, listener.callsToAssigned)
assertEquals(1, listener.callsToRevoked)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2170f4a..cf8dd00 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -616,10 +616,10 @@ class GroupCoordinatorTest {
groupId,
CompletingRebalance,
Some(protocolType))
- assertEquals(leaderJoinGroupResult.leaderId, leaderJoinGroupResult.memberId)
+ assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.memberId)
assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.leaderId)
- // Old member shall be getting a successful join group response.
+ // Old follower shall be getting a successful join group response.
val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
checkJoinGroupResult(oldFollowerJoinGroupResult,
Errors.NONE,
@@ -629,31 +629,37 @@ class GroupCoordinatorTest {
CompletingRebalance,
Some(protocolType),
expectedLeaderId = leaderJoinGroupResult.memberId)
+ assertEquals(rebalanceResult.followerId, oldFollowerJoinGroupResult.memberId)
+ assertEquals(rebalanceResult.leaderId, oldFollowerJoinGroupResult.leaderId)
+ assertTrue(getGroup(groupId).is(CompletingRebalance))
+ // Duplicate follower joins group with unknown member id will trigger member.id replacement,
+ // and will also trigger a rebalance under CompletingRebalance state; the old follower sync callback
+ // will return fenced exception while broker replaces the member identity with the duplicate follower joins.
EasyMock.reset(replicaManager)
val oldFollowerSyncGroupFuture = sendSyncGroupFollower(groupId, oldFollowerJoinGroupResult.generationId,
oldFollowerJoinGroupResult.memberId, Some(protocolType), Some(protocolName), followerInstanceId)
-
- // Duplicate follower joins group with unknown member id will trigger member.id replacement.
EasyMock.reset(replicaManager)
val duplicateFollowerJoinFuture =
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = followerInstanceId)
timer.advanceClock(1)
-
- // Old follower sync callback will return fenced exception while broker replaces the member identity.
val oldFollowerSyncGroupResult = Await.result(oldFollowerSyncGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerSyncGroupResult.error)
+ assertTrue(getGroup(groupId).is(PreparingRebalance))
+
+ timer.advanceClock(GroupInitialRebalanceDelay + 1)
+ timer.advanceClock(DefaultRebalanceTimeout + 1)
- // Duplicate follower will get the same response as old follower.
val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinFuture, Duration(1, TimeUnit.MILLISECONDS))
checkJoinGroupResult(duplicateFollowerJoinGroupResult,
Errors.NONE,
- rebalanceResult.generation + 1,
- Set.empty,
+ rebalanceResult.generation + 2,
+ Set(followerInstanceId), // this follower will become the new leader, and hence it would have the member list
groupId,
CompletingRebalance,
Some(protocolType),
- expectedLeaderId = leaderJoinGroupResult.memberId)
+ expectedLeaderId = duplicateFollowerJoinGroupResult.memberId)
+ assertTrue(getGroup(groupId).is(CompletingRebalance))
}
@Test