You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/26 18:44:52 UTC
[kafka] branch trunk updated: KAFKA-7862 & KIP-345 part-one: Add
static membership logic to JoinGroup protocol (#6177)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0f995ba KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (#6177)
0f995ba is described below
commit 0f995ba6be1c0f949320b0879241d9a7c9578436
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Fri Apr 26 11:44:38 2019 -0700
KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (#6177)
This is the first diff for the implementation of JoinGroup logic for static membership. The goal of this diff contains:
* Add group.instance.id to be unique identifier for consumer instances, provided by end user;
Modify group coordinator to accept JoinGroupRequest with/without static membership, refactor the logic for readability and code reusability.
* Add client side support for incorporating static membership changes, including new config for group.instance.id, apply stream thread client id by default, and new join group exception handling.
* Increase max session timeout to 30 min for more user flexibility if they are inclined to tolerate partial unavailability than burdening rebalance.
* Unit tests for each module changes, especially on the group coordinator logic. Crossing the possibilities like:
6.1 Dynamic/Static member
6.2 Known/Unknown member id
6.3 Group stable/unstable
6.4 Leader/Follower
The rest of the 345 change will be broken down to 4 separate diffs:
* Avoid kicking out members through rebalance.timeout, only do the kick out through session timeout.
* Changes around LeaveGroup logic, including version bumping, broker logic, client logic, etc.
* Admin client changes to add ability to batch remove static members
* Deprecate group.initial.rebalance.delay
Reviewers: Liquan Pei <li...@gmail.com>, Stanislav Kozlovski <fa...@windowslive.com>, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
checkstyle/suppressions.xml | 6 +-
.../kafka/clients/consumer/ConsumerConfig.java | 21 +-
.../kafka/clients/consumer/KafkaConsumer.java | 12 +
.../consumer/internals/AbstractCoordinator.java | 23 +-
.../consumer/internals/ConsumerCoordinator.java | 2 +
.../common/errors/FencedInstanceIdException.java | 29 ++
.../org/apache/kafka/common/protocol/Errors.java | 5 +-
.../kafka/common/requests/JoinGroupRequest.java | 36 ++
.../resources/common/message/JoinGroupRequest.json | 5 +-
.../common/message/JoinGroupResponse.json | 5 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 63 +--
.../internals/AbstractCoordinatorTest.java | 102 +++-
.../internals/ConsumerCoordinatorTest.java | 63 ++-
.../common/requests/JoinGroupRequestTest.java | 65 +++
.../runtime/distributed/WorkerCoordinator.java | 4 +-
core/src/main/scala/kafka/api/ApiVersion.scala | 11 +-
.../kafka/coordinator/group/GroupCoordinator.scala | 200 ++++---
.../kafka/coordinator/group/GroupMetadata.scala | 64 ++-
.../coordinator/group/GroupMetadataManager.scala | 42 +-
.../kafka/coordinator/group/MemberMetadata.scala | 11 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 22 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 1 +
.../group/GroupCoordinatorConcurrencyTest.scala | 2 +-
.../coordinator/group/GroupCoordinatorTest.scala | 573 ++++++++++++++++++---
.../group/GroupMetadataManagerTest.scala | 56 +-
.../coordinator/group/GroupMetadataTest.scala | 17 +-
.../coordinator/group/MemberMetadataTest.scala | 19 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 1 +
.../streams/processor/internals/StreamThread.java | 1 +
.../FineGrainedAutoResetIntegrationTest.java | 8 +-
tests/kafkatest/services/verifiable_consumer.py | 21 +-
tests/kafkatest/tests/client/consumer_test.py | 76 ++-
tests/kafkatest/tests/verifiable_consumer_test.py | 4 +-
.../org/apache/kafka/tools/VerifiableConsumer.java | 13 +
35 files changed, 1300 insertions(+), 285 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ca103a2..0c65edc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -23,7 +23,7 @@
files="AbstractResponse.java"/>
<suppress checks="MethodLength"
- files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java"/>
+ files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
<suppress checks="ParameterNumber"
files="NetworkClient.java"/>
@@ -48,10 +48,10 @@
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
<suppress checks="BooleanExpressionComplexity"
- files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
+ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
- files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator).java"/>
+ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|SchemaGenerator|AbstractCoordinator).java"/>
<suppress checks="JavaNCSS"
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b92cbf9..bd1c984 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -53,6 +53,16 @@ public class ConsumerConfig extends AbstractConfig {
public static final String GROUP_ID_CONFIG = "group.id";
private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
+ /**
+ * <code>group.instance.id</code>
+ */
+ public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
+ private static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " +
+ "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " +
+ "which means that only one instance with this ID is allowed in the consumer group at any time. " +
+ "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " +
+ "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.";
+
/** <code>max.poll.records</code> */
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
@@ -278,6 +288,11 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC)
+ .define(GROUP_INSTANCE_ID_CONFIG,
+ Type.STRING,
+ null,
+ Importance.MEDIUM,
+ GROUP_INSTANCE_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
10000,
@@ -455,9 +470,9 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
- Type.BOOLEAN,
- true,
- Importance.LOW)
+ Type.BOOLEAN,
+ true,
+ Importance.LOW)
.define(ISOLATION_LEVEL_CONFIG,
Type.STRING,
DEFAULT_ISOLATION_LEVEL,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 839057c..c73a028 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
@@ -567,6 +568,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Logger log;
private final String clientId;
private String groupId;
+ private Optional<String> groupInstanceId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
@@ -671,6 +673,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+
+ String groupInstanceId = config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
+ if (groupInstanceId != null) {
+ JoinGroupRequest.validateGroupInstanceId(groupInstanceId);
+ this.groupInstanceId = Optional.of(groupInstanceId);
+ } else {
+ this.groupInstanceId = Optional.empty();
+ }
+
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
@@ -764,6 +775,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
new ConsumerCoordinator(logContext,
this.client,
groupId,
+ this.groupInstanceId,
maxPollIntervalMs,
sessionTimeoutMs,
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9261966..0124338 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -64,6 +64,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -112,10 +113,12 @@ public abstract class AbstractCoordinator implements Closeable {
private final Heartbeat heartbeat;
protected final int rebalanceTimeoutMs;
protected final String groupId;
+ protected final Optional<String> groupInstanceId;
protected final ConsumerNetworkClient client;
protected final Time time;
protected final long retryBackoffMs;
+
private HeartbeatThread heartbeatThread = null;
private boolean rejoinNeeded = true;
private boolean needsJoinPrepare = true;
@@ -132,6 +135,7 @@ public abstract class AbstractCoordinator implements Closeable {
public AbstractCoordinator(LogContext logContext,
ConsumerNetworkClient client,
String groupId,
+ Optional<String> groupInstanceId,
int rebalanceTimeoutMs,
int sessionTimeoutMs,
Heartbeat heartbeat,
@@ -145,6 +149,7 @@ public abstract class AbstractCoordinator implements Closeable {
this.time = time;
this.groupId = Objects.requireNonNull(groupId,
"Expected a non-null group id for coordinator construction");
+ this.groupInstanceId = groupInstanceId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.sessionTimeoutMs = sessionTimeoutMs;
this.leaveGroupOnClose = leaveGroupOnClose;
@@ -156,6 +161,7 @@ public abstract class AbstractCoordinator implements Closeable {
public AbstractCoordinator(LogContext logContext,
ConsumerNetworkClient client,
String groupId,
+ Optional<String> groupInstanceId,
int rebalanceTimeoutMs,
int sessionTimeoutMs,
int heartbeatIntervalMs,
@@ -164,7 +170,7 @@ public abstract class AbstractCoordinator implements Closeable {
Time time,
long retryBackoffMs,
boolean leaveGroupOnClose) {
- this(logContext, client, groupId, rebalanceTimeoutMs, sessionTimeoutMs,
+ this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
}
@@ -495,6 +501,7 @@ public abstract class AbstractCoordinator implements Closeable {
.setGroupId(groupId)
.setSessionTimeoutMs(this.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
+ .setGroupInstanceId(this.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceTimeoutMs)
@@ -552,7 +559,9 @@ public abstract class AbstractCoordinator implements Closeable {
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
- || error == Errors.GROUP_MAX_SIZE_REACHED) {
+ || error == Errors.GROUP_MAX_SIZE_REACHED
+ || error == Errors.FENCED_INSTANCE_ID) {
+ // log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(groupId));
@@ -820,7 +829,11 @@ public abstract class AbstractCoordinator implements Closeable {
* Leave the current group and reset local generation/memberId.
*/
public synchronized void maybeLeaveGroup() {
- if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId()) {
+ // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
+ // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
+ // and the membership expiration is only controlled by session timeout.
+ if (isDynamicMember() && !coordinatorUnknown() &&
+ state != MemberState.UNJOINED && generation.hasMemberId()) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator);
@@ -834,6 +847,10 @@ public abstract class AbstractCoordinator implements Closeable {
resetGeneration();
}
+ protected boolean isDynamicMember() {
+ return !groupInstanceId.isPresent();
+ }
+
private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
@Override
public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4d40070..deed257 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -121,6 +121,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public ConsumerCoordinator(LogContext logContext,
ConsumerNetworkClient client,
String groupId,
+ Optional<String> groupInstanceId,
int rebalanceTimeoutMs,
int sessionTimeoutMs,
Heartbeat heartbeat,
@@ -138,6 +139,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
super(logContext,
client,
groupId,
+ groupInstanceId,
rebalanceTimeoutMs,
sessionTimeoutMs,
heartbeat,
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/FencedInstanceIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/FencedInstanceIdException.java
new file mode 100644
index 0000000..78e4034
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/FencedInstanceIdException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FencedInstanceIdException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public FencedInstanceIdException(String message) {
+ super(message);
+ }
+
+ public FencedInstanceIdException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index cc493c2..bf168bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -60,6 +60,7 @@ import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
@@ -302,7 +303,9 @@ public enum Errors {
MemberIdRequiredException::new),
PREFERRED_LEADER_NOT_AVAILABLE(80, "The preferred leader was not available",
PreferredLeaderNotAvailableException::new),
- GROUP_MAX_SIZE_REACHED(81, "The consumer group has reached its max size.", GroupMaxSizeReachedException::new);
+ GROUP_MAX_SIZE_REACHED(81, "The consumer group has reached its max size.", GroupMaxSizeReachedException::new),
+ FENCED_INSTANCE_ID(82, "The coordinator reports a more recent member.id associated with the consumer's group.instance.id.",
+ FencedInstanceIdException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 2f46172..f9244cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -52,6 +53,40 @@ public class JoinGroupRequest extends AbstractRequest {
public static final String UNKNOWN_MEMBER_ID = "";
+ private static final int MAX_GROUP_INSTANCE_ID_LENGTH = 249;
+
+ /**
+ * Ported from class Topic in {@link org.apache.kafka.common.internals} to restrict the charset for
+ * static member id.
+ */
+ public static void validateGroupInstanceId(String id) {
+ if (id.equals(""))
+ throw new InvalidConfigurationException("Group instance id must be non-empty string");
+ if (id.equals(".") || id.equals(".."))
+ throw new InvalidConfigurationException("Group instance id cannot be \".\" or \"..\"");
+ if (id.length() > MAX_GROUP_INSTANCE_ID_LENGTH)
+ throw new InvalidConfigurationException("Group instance id can't be longer than " + MAX_GROUP_INSTANCE_ID_LENGTH +
+ " characters: " + id);
+ if (!containsValidPattern(id))
+ throw new InvalidConfigurationException("Group instance id \"" + id + "\" is illegal, it contains a character other than " +
+ "ASCII alphanumerics, '.', '_' and '-'");
+ }
+
+ /**
+ * Valid characters for Consumer group.instance.id are the ASCII alphanumerics, '.', '_', and '-'
+ */
+ static boolean containsValidPattern(String topic) {
+ for (int i = 0; i < topic.length(); ++i) {
+ char c = topic.charAt(i);
+
+ boolean validChar = (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || c == '.' ||
+ c == '_' || c == '-';
+ if (!validChar)
+ return false;
+ }
+ return true;
+ }
+
public JoinGroupRequest(JoinGroupRequestData data, short version) {
super(ApiKeys.JOIN_GROUP, version);
this.data = data;
@@ -86,6 +121,7 @@ public class JoinGroupRequest extends AbstractRequest {
case 2:
case 3:
case 4:
+ case 5:
return new JoinGroupResponse(
new JoinGroupResponseData()
.setThrottleTimeMs(throttleTimeMs)
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 2a3000c..ec98d4a 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -20,8 +20,9 @@
// Version 1 adds RebalanceTimeoutMs.
// Version 2 and 3 are the same as version 1.
// Starting from version 4, the client needs to issue a second request to join group
+ // Starting from version 5, we add a new field called groupInstanceId to indicate member identity across restarts.
// with assigned id.
- "validVersions": "0-4",
+ "validVersions": "0-5",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about": "The group identifier." },
@@ -33,6 +34,8 @@
"about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id assigned by the group coordinator." },
+ { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+ "about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "ProtocolType", "type": "string", "versions": "0+",
"about": "The unique name the for class of protocols implemented by the group we want to join." },
{ "name": "Protocols", "type": "[]JoinGroupRequestProtocol", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json b/clients/src/main/resources/common/message/JoinGroupResponse.json
index a1c6a70..6ffa1bc 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -22,7 +22,8 @@
// Starting in version 3, on quota violation, brokers send out responses before throttling.
// Starting in version 4, the client needs to issue a second request to join group
// with assigned id.
- "validVersions": "0-4",
+ // Version 5 is bumped to apply group.instance.id to identify member across restarts.
+ "validVersions": "0-5",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -39,6 +40,8 @@
{ "name": "Members", "type": "[]JoinGroupResponseMember", "versions": "0+", "fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The group member ID." },
+ { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+ "about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "Metadata", "type": "bytes", "versions": "0+",
"about": "The group member metadata." }
]}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 25c72c6..8f23d41 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -145,6 +145,7 @@ public class KafkaConsumerTest {
private final int autoCommitIntervalMs = 500;
private final String groupId = "mock-group";
+ private final Optional<String> groupInstanceId = Optional.of("mock-instance");
@Test
public void testMetricsReporterAutoGeneratedClientId() {
@@ -384,7 +385,8 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -417,7 +419,7 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -452,7 +454,7 @@ public class KafkaConsumerTest {
final PartitionAssignor assignor = new RoundRobinAssignor();
- final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -476,7 +478,7 @@ public class KafkaConsumerTest {
final PartitionAssignor assignor = new RoundRobinAssignor();
- final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -499,7 +501,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singleton(tp0));
consumer.seekToBeginning(singleton(tp0));
@@ -575,7 +577,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
- true, groupId);
+ true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -599,7 +601,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
- true, groupId);
+ true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -624,7 +626,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
- true, groupId);
+ true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -651,7 +653,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
- true, groupId);
+ true, groupId, Optional.empty());
consumer.assign(singletonList(tp0));
consumer.seek(tp0, 20L);
consumer.poll(Duration.ZERO);
@@ -673,7 +675,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
// lookup coordinator
@@ -711,7 +713,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -751,7 +753,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -783,7 +785,7 @@ public class KafkaConsumerTest {
initMetadata(client, partitionCounts);
Node node = metadata.fetch().nodes().get(0);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -815,7 +817,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -865,7 +867,7 @@ public class KafkaConsumerTest {
final PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -895,7 +897,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RangeAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -935,7 +937,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RangeAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// initial subscription
consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
@@ -1049,7 +1051,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RangeAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
// initial subscription
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -1111,7 +1113,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RangeAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1167,7 +1169,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RangeAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1221,7 +1223,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RangeAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@@ -1416,7 +1418,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@@ -1497,7 +1499,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
- final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false);
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1635,7 +1637,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RangeAssignor();
client.createPendingAuthenticationError(node, 0);
- return newConsumer(time, client, subscription, metadata, assignor, false);
+ return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
}
private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
@@ -1829,15 +1831,16 @@ public class KafkaConsumerTest {
SubscriptionState subscription,
ConsumerMetadata metadata,
PartitionAssignor assignor,
- boolean autoCommitEnabled) {
- return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId);
+ boolean autoCommitEnabled,
+ Optional<String> groupInstanceId) {
+ return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId);
}
private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
KafkaClient client,
SubscriptionState subscription,
ConsumerMetadata metadata) {
- return newConsumer(time, client, subscription, metadata, new RangeAssignor(), false, groupId);
+ return newConsumer(time, client, subscription, metadata, new RangeAssignor(), false, groupId, groupInstanceId);
}
private KafkaConsumer<String, String> newConsumer(Time time,
@@ -1846,7 +1849,8 @@ public class KafkaConsumerTest {
ConsumerMetadata metadata,
PartitionAssignor assignor,
boolean autoCommitEnabled,
- String groupId) {
+ String groupId,
+ Optional<String> groupInstanceId) {
String clientId = "mock-consumer";
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
@@ -1878,6 +1882,7 @@ public class KafkaConsumerTest {
loggerFactory,
consumerClient,
groupId,
+ groupInstanceId,
rebalanceTimeoutMs,
sessionTimeoutMs,
heartbeat,
@@ -1975,7 +1980,7 @@ public class KafkaConsumerTest {
topicMetadata);
client.prepareMetadataUpdate(updateResponse);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true);
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
consumer.poll(Duration.ZERO);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index dc8ea46..c3f0ff5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
@@ -47,6 +48,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -81,14 +83,16 @@ public class AbstractCoordinatorTest {
private DummyCoordinator coordinator;
private void setupCoordinator() {
- setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS);
+ setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS,
+ Optional.empty());
}
private void setupCoordinator(int retryBackoffMs) {
- setupCoordinator(retryBackoffMs, REBALANCE_TIMEOUT_MS);
+ setupCoordinator(retryBackoffMs, REBALANCE_TIMEOUT_MS,
+ Optional.empty());
}
- private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs) {
+ private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId) {
LogContext logContext = new LogContext();
this.mockTime = new MockTime();
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 60 * 60 * 1000L,
@@ -103,7 +107,7 @@ public class AbstractCoordinatorTest {
mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
this.node = metadata.fetch().nodes().get(0);
this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
- this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs);
+ this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs, groupInstanceId);
}
@Test
@@ -171,7 +175,8 @@ public class AbstractCoordinatorTest {
@Test
public void testJoinGroupRequestTimeout() {
- setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS);
+ setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS,
+ Optional.empty());
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
@@ -188,7 +193,8 @@ public class AbstractCoordinatorTest {
public void testJoinGroupRequestMaxTimeout() {
// Ensure we can handle the maximum allowed rebalance timeout
- setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE);
+ setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE,
+ Optional.empty());
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
@@ -235,6 +241,83 @@ public class AbstractCoordinatorTest {
}
@Test
+ public void testJoinGroupRequestWithMemberIdMisMatch() {
+ setupCoordinator();
+ mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+ final String memberId = "memberId";
+ final int generation = -1;
+
+ mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.FENCED_INSTANCE_ID));
+
+ RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
+ assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
+ assertEquals(Errors.FENCED_INSTANCE_ID.message(), future.exception().getMessage());
+ // Make sure the exception is fatal.
+ assertFalse(future.isRetriable());
+ }
+
+ @Test
+ public void testJoinGroupRequestWithGroupInstanceIdNotFound() {
+ setupCoordinator();
+ mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+ final String memberId = "memberId";
+ final int generation = -1;
+
+ mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID));
+
+ RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
+
+ assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.message(), future.exception().getMessage());
+ assertTrue(coordinator.rejoinNeededOrPending());
+ assertTrue(coordinator.hasMatchingGenerationId(generation));
+ }
+
+ @Test
+ public void testLeaveGroupSentWithGroupInstanceIdUnSet() {
+ checkLeaveGroupRequestSent(Optional.empty());
+ checkLeaveGroupRequestSent(Optional.of("groupInstanceId"));
+ }
+
+ private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
+ setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, groupInstanceId);
+
+ mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+ mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+ final RuntimeException e = new RuntimeException();
+
+ // raise the error when the coordinator tries to send leave group request.
+ mockClient.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ if (body instanceof LeaveGroupRequest)
+ throw e;
+ return false;
+ }
+ }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
+
+ try {
+ coordinator.ensureActiveGroup();
+ coordinator.close();
+ if (coordinator.isDynamicMember()) {
+ fail("Expected leavegroup to raise an error.");
+ }
+ } catch (RuntimeException exception) {
+ if (coordinator.isDynamicMember()) {
+ assertEquals(exception, e);
+ } else {
+ fail("Coordinator with group.instance.id set shouldn't send leave group request.");
+ }
+ }
+ }
+
+ @Test
public void testUncaughtExceptionInHeartbeatThread() throws Exception {
setupCoordinator();
@@ -723,9 +806,10 @@ public class AbstractCoordinatorTest {
Metrics metrics,
Time time,
int rebalanceTimeoutMs,
- int retryBackoffMs) {
- super(new LogContext(), client, GROUP_ID, rebalanceTimeoutMs, SESSION_TIMEOUT_MS,
- HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, false);
+ int retryBackoffMs,
+ Optional<String> groupInstanceId) {
+ super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
+ SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 645e6ed..3f9d89f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -105,6 +105,7 @@ public class ConsumerCoordinatorTest {
private final TopicPartition t1p = new TopicPartition(topic1, 0);
private final TopicPartition t2p = new TopicPartition(topic2, 0);
private final String groupId = "test-group";
+ private final Optional<String> groupInstanceId = Optional.of("test-instance");
private final int rebalanceTimeoutMs = 60000;
private final int sessionTimeoutMs = 10000;
private final int heartbeatIntervalMs = 5000;
@@ -148,7 +149,7 @@ public class ConsumerCoordinatorTest {
this.mockOffsetCommitCallback = new MockCommitCallback();
this.partitionAssignor.clear();
- this.coordinator = buildCoordinator(metrics, assignors, false, true);
+ this.coordinator = buildCoordinator(metrics, assignors, false, Optional.empty());
}
@After
@@ -1065,7 +1066,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testWakeupFromAssignmentCallback() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- false, true);
+ false, Optional.empty());
final String topic = "topic1";
TopicPartition partition = new TopicPartition(topic, 0);
@@ -1182,7 +1183,7 @@ public class ConsumerCoordinatorTest {
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
subscriptions, new LogContext(), new ClusterResourceListeners());
client = new MockClient(time, metadata);
- coordinator = buildCoordinator(new Metrics(), assignors, false, true);
+ coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty());
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
@@ -1311,7 +1312,7 @@ public class ConsumerCoordinatorTest {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, true);
+ true, groupInstanceId);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1327,7 +1328,7 @@ public class ConsumerCoordinatorTest {
public void testAutoCommitRetryBackoff() {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, true);
+ true, groupInstanceId);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1360,8 +1361,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitAwaitsInterval() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, true);
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, true, groupInstanceId);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1400,7 +1400,7 @@ public class ConsumerCoordinatorTest {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, true);
+ true, groupInstanceId);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -1426,7 +1426,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignment() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, true);
+ true, groupInstanceId);
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
@@ -1443,7 +1443,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, true);
+ true, groupInstanceId);
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
@@ -1953,25 +1953,19 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseDynamicAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty());
gracefulCloseTest(coordinator, true);
}
@Test
public void testCloseManualAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
- gracefulCloseTest(coordinator, false);
- }
-
- @Test
- public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception {
- final ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, false);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
gracefulCloseTest(coordinator, false);
}
@Test
public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 1000, 1000, 1000);
@@ -1979,14 +1973,14 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
closeVerifyTimeout(coordinator, 1000, 0, 0);
}
@Test
public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 1000, 1000, 1000);
@@ -1994,14 +1988,14 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
closeVerifyTimeout(coordinator, 1000, 0, 0);
}
@Test
public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 1000, 1000, 1000);
@@ -2009,7 +2003,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
@@ -2017,27 +2011,27 @@ public class ConsumerCoordinatorTest {
@Test
public void testCloseNoResponseForCommit() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
}
@Test
public void testCloseNoResponseForLeaveGroup() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
}
@Test
public void testCloseNoWait() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
time.sleep(autoCommitIntervalMs);
closeVerifyTimeout(coordinator, 0, 0, 0);
}
@Test
public void testHeartbeatThreadClose() throws Exception {
- ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
+ ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
coordinator.ensureActiveGroup();
time.sleep(heartbeatIntervalMs + 100);
Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
@@ -2051,7 +2045,8 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitAfterCoordinatorBackToService() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, true);
+ true, groupInstanceId);
+
subscriptions.assignFromUser(Collections.singleton(t1p));
subscriptions.seek(t1p, 100L);
@@ -2069,10 +2064,10 @@ public class ConsumerCoordinatorTest {
private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
final boolean autoCommit,
- final boolean leaveGroup) {
+ final Optional<String> groupInstanceId) {
final String consumerId = "consumer";
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- autoCommit, leaveGroup);
+ autoCommit, groupInstanceId);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
if (useGroupManagement) {
@@ -2165,11 +2160,12 @@ public class ConsumerCoordinatorTest {
private ConsumerCoordinator buildCoordinator(final Metrics metrics,
final List<PartitionAssignor> assignors,
final boolean autoCommitEnabled,
- final boolean leaveGroup) {
+ final Optional<String> groupInstanceId) {
return new ConsumerCoordinator(
new LogContext(),
consumerClient,
groupId,
+ groupInstanceId,
rebalanceTimeoutMs,
sessionTimeoutMs,
heartbeat,
@@ -2183,7 +2179,8 @@ public class ConsumerCoordinatorTest {
autoCommitEnabled,
autoCommitIntervalMs,
null,
- leaveGroup);
+ true
+ );
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
new file mode 100644
index 0000000..0271aac
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class JoinGroupRequestTest {
+
+ @Test
+ public void shouldAcceptValidGroupInstanceIds() {
+ String maxLengthString = TestUtils.randomString(249);
+ String[] validGroupInstanceIds = {"valid", "INSTANCE", "gRoUp", "ar6", "VaL1d", "_0-9_.", "...", maxLengthString};
+
+ for (String instanceId : validGroupInstanceIds) {
+ JoinGroupRequest.validateGroupInstanceId(instanceId);
+ }
+ }
+
+ @Test
+ public void shouldThrowOnInvalidGroupInstanceIds() {
+ char[] longString = new char[250];
+ Arrays.fill(longString, 'a');
+ String[] invalidGroupInstanceIds = {"", "foo bar", "..", "foo:bar", "foo=bar", ".", new String(longString)};
+
+ for (String instanceId : invalidGroupInstanceIds) {
+ try {
+ JoinGroupRequest.validateGroupInstanceId(instanceId);
+ fail("No exception was thrown for invalid instance id: " + instanceId);
+ } catch (InvalidConfigurationException e) {
+ // Good
+ }
+ }
+ }
+
+ @Test
+ public void shouldRecognizeInvalidCharactersInGroupInstanceIds() {
+ char[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='};
+
+ for (char c : invalidChars) {
+ String instanceId = "Is " + c + "illegal";
+ assertFalse(JoinGroupRequest.containsValidPattern(instanceId));
+ }
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index fa89a9d..a5882e8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments
@@ -78,6 +79,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
super(logContext,
client,
groupId,
+ Optional.empty(),
rebalanceTimeoutMs,
sessionTimeoutMs,
heartbeatIntervalMs,
@@ -85,7 +87,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
metricGrpPrefix,
time,
retryBackoffMs,
- true);
+ true);
this.log = logContext.logger(WorkerCoordinator.class);
this.restUrl = restUrl;
this.configStorage = configStorage;
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 8f78a96..d8d1edb 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -86,7 +86,9 @@ object ApiVersion {
// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
KAFKA_2_2_IV0,
// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207)
- KAFKA_2_2_IV1
+ KAFKA_2_2_IV1,
+ // Introduced static membership.
+ KAFKA_2_3_IV0
)
// Map keys are the union of the short and full versions
@@ -298,6 +300,13 @@ case object KAFKA_2_2_IV1 extends DefaultApiVersion {
val id: Int = 21
}
+case object KAFKA_2_3_IV0 extends DefaultApiVersion {
+ val shortVersion: String = "2.3"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V2
+ val id: Int = 22
+}
+
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index ead05ba..6a3dbbc 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,6 +27,7 @@ import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
import org.apache.kafka.common.requests._
@@ -101,6 +102,7 @@ class GroupCoordinator(val brokerId: Int,
def handleJoinGroup(groupId: String,
memberId: String,
+ groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
@@ -126,22 +128,22 @@ class GroupCoordinator(val brokerId: Int,
// exist we should reject the request.
if (isUnknownMember) {
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
- doUnknownJoinGroup(group, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
}
-
case Some(group) =>
group.inLock {
if ((groupIsOverCapacity(group)
&& group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet
|| (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
group.remove(memberId)
+ group.removeStaticMember(groupInstanceId)
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
} else if (isUnknownMember) {
- doUnknownJoinGroup(group, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
- doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
// attempt to complete JoinGroup
@@ -149,11 +151,12 @@ class GroupCoordinator(val brokerId: Int,
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
+ }
}
}
- }
private def doUnknownJoinGroup(group: GroupMetadata,
+ groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
@@ -174,15 +177,44 @@ class GroupCoordinator(val brokerId: Int,
} else {
val newMemberId = clientId + "-" + group.generateMemberIdSuffix
- if (requireKnownMemberId) {
- // If member id required, register the member in the pending member list
- // and send back a response to call for another join group request with allocated member id.
- group.addPendingMember(newMemberId)
- addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
- responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
+ if (group.hasStaticMember(groupInstanceId)) {
+ val oldMemberId = group.getStaticMemberId(groupInstanceId)
+
+ if (group.is(Stable)) {
+ info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while" +
+ s"old member $oldMemberId will be removed. No rebalance will be triggered.")
+
+ val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
+
+ // Heartbeat of old member id will expire without affection since the group no longer contains that member id.
+ // New heartbeat shall be scheduled with new member id.
+ completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+
+ responseCallback(JoinGroupResult(
+ members = if (group.isLeader(newMemberId)) {
+ group.currentMemberMetadata
+ } else {
+ List.empty
+ },
+ memberId = newMemberId,
+ generationId = group.generationId,
+ subProtocol = group.protocolOrNull,
+ leaderId = group.leaderOrNull,
+ error = Errors.NONE))
+ } else {
+ val knownStaticMember = group.get(oldMemberId)
+ updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
+ }
+ } else if (requireKnownMemberId) {
+ // If member id required (dynamic membership), register the member in the pending member list
+ // and send back a response to call for another join group request with allocated member id.
+ group.addPendingMember(newMemberId)
+ addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
+ responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
- addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, clientId, clientHost, protocolType,
- protocols, group, responseCallback)
+ addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
+ clientId, clientHost, protocolType, protocols, group, responseCallback)
+
}
}
}
@@ -190,6 +222,7 @@ class GroupCoordinator(val brokerId: Int,
private def doJoinGroup(group: GroupMetadata,
memberId: String,
+ groupInstanceId: Option[String],
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
@@ -207,65 +240,79 @@ class GroupCoordinator(val brokerId: Int,
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (group.isPendingMember(memberId)) {
- // A rejoining pending member will be accepted.
- addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, clientId, clientHost, protocolType,
- protocols, group, responseCallback)
- } else if (!group.has(memberId)) {
- // if the member trying to register with a un-recognized id, send the response to let
- // it reset its member id and retry.
- responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+ // A rejoining pending member will be accepted. Note that pending member will never be a static member.
+ if (groupInstanceId.isDefined) {
+ throw new IllegalStateException(s"the static member $groupInstanceId was unexpectedly to be assigned " +
+ s"into pending member bucket with member id $memberId")
+ } else {
+ addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
+ clientId, clientHost, protocolType, protocols, group, responseCallback)
+ }
} else {
- group.currentState match {
- case PreparingRebalance =>
- val member = group.get(memberId)
- updateMemberAndRebalance(group, member, protocols, responseCallback)
+ val isKnownGroupInstanceId = group.hasStaticMember(groupInstanceId)
+ val groupInstanceIdNotFound = groupInstanceId.isDefined && !isKnownGroupInstanceId
+
+ if (isKnownGroupInstanceId && group.getStaticMemberId(groupInstanceId) != memberId) {
+ // given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
+ responseCallback(joinError(memberId, Errors.FENCED_INSTANCE_ID))
+ } else if (!group.has(memberId) || groupInstanceIdNotFound) {
+ // If the dynamic member trying to register with an unrecognized id, or
+ // the static member joins with unknown group instance id, send the response to let
+ // it reset its member id and retry.
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+ } else {
+ val member = group.get(memberId)
- case CompletingRebalance =>
- val member = group.get(memberId)
- if (member.matches(protocols)) {
- // member is joining with the same metadata (which could be because it failed to
- // receive the initial JoinGroup response), so just return current group information
- // for the current generation.
- responseCallback(JoinGroupResult(
- members = if (group.isLeader(memberId)) {
- group.currentMemberMetadata
- } else {
- Map.empty
- },
- memberId = memberId,
- generationId = group.generationId,
- subProtocol = group.protocolOrNull,
- leaderId = group.leaderOrNull,
- error = Errors.NONE))
- } else {
- // member has changed metadata, so force a rebalance
+ group.currentState match {
+ case PreparingRebalance =>
updateMemberAndRebalance(group, member, protocols, responseCallback)
- }
- case Stable =>
- val member = group.get(memberId)
- if (group.isLeader(memberId) || !member.matches(protocols)) {
- // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
- // The latter allows the leader to trigger rebalances for changes affecting assignment
- // which do not affect the member metadata (such as topic metadata changes for the consumer)
- updateMemberAndRebalance(group, member, protocols, responseCallback)
- } else {
- // for followers with no actual change to their metadata, just return group information
- // for the current generation which will allow them to issue SyncGroup
- responseCallback(JoinGroupResult(
- members = Map.empty,
- memberId = memberId,
- generationId = group.generationId,
- subProtocol = group.protocolOrNull,
- leaderId = group.leaderOrNull,
- error = Errors.NONE))
- }
+ case CompletingRebalance =>
+ if (member.matches(protocols)) {
+ // member is joining with the same metadata (which could be because it failed to
+ // receive the initial JoinGroup response), so just return current group information
+ // for the current generation.
+ responseCallback(JoinGroupResult(
+ members = if (group.isLeader(memberId)) {
+ group.currentMemberMetadata
+ } else {
+ List.empty
+ },
+ memberId = memberId,
+ generationId = group.generationId,
+ subProtocol = group.protocolOrNull,
+ leaderId = group.leaderOrNull,
+ error = Errors.NONE))
+ } else {
+ // member has changed metadata, so force a rebalance
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ }
- case Empty | Dead =>
- // Group reaches unexpected state. Let the joining member reset their generation and rejoin.
- warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
- s"unexpected group state ${group.currentState}")
- responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+ case Stable =>
+ val member = group.get(memberId)
+ if (group.isLeader(memberId) || !member.matches(protocols)) {
+ // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
+ // The latter allows the leader to trigger rebalances for changes affecting assignment
+ // which do not affect the member metadata (such as topic metadata changes for the consumer)
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ } else {
+ // for followers with no actual change to their metadata, just return group information
+ // for the current generation which will allow them to issue SyncGroup
+ responseCallback(JoinGroupResult(
+ members = List.empty,
+ memberId = memberId,
+ generationId = group.generationId,
+ subProtocol = group.protocolOrNull,
+ leaderId = group.leaderOrNull,
+ error = Errors.NONE))
+ }
+
+ case Empty | Dead =>
+ // Group reaches unexpected state. Let the joining member reset their generation and rejoin.
+ warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
+ s"unexpected group state ${group.currentState}")
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+ }
}
}
}
@@ -715,7 +762,7 @@ class GroupCoordinator(val brokerId: Int,
private def joinError(memberId: String, error: Errors): JoinGroupResult = {
JoinGroupResult(
- members = Map.empty,
+ members = List.empty,
memberId = memberId,
generationId = GroupCoordinator.NoGeneration,
subProtocol = GroupCoordinator.NoProtocol,
@@ -761,13 +808,15 @@ class GroupCoordinator(val brokerId: Int,
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
memberId: String,
+ groupInstanceId: Option[String],
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) {
- val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
+ val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
+ clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
member.isNew = true
@@ -786,8 +835,11 @@ class GroupCoordinator(val brokerId: Int,
// for new members. If the new member is still there, we expect it to retry.
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
- group.removePendingMember(memberId)
- maybePrepareRebalance(group, s"Adding new member $memberId")
+ if (member.isStaticMember)
+ group.addStaticMember(groupInstanceId, memberId)
+ else
+ group.removePendingMember(memberId)
+ maybePrepareRebalance(group, s"Adding new member $memberId with group instanceid $groupInstanceId")
}
private def updateMemberAndRebalance(group: GroupMetadata,
@@ -836,6 +888,7 @@ class GroupCoordinator(val brokerId: Int,
group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID))
group.remove(member.memberId)
+ group.removeStaticMember(member.groupInstanceId)
group.currentState match {
case Dead | Empty =>
@@ -870,6 +923,7 @@ class GroupCoordinator(val brokerId: Int,
group.notYetRejoinedMembers.foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
+ group.removeStaticMember(failedMember.groupInstanceId)
// TODO: cut the socket connection to the client
}
@@ -898,7 +952,7 @@ class GroupCoordinator(val brokerId: Int,
members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata
} else {
- Map.empty
+ List.empty
},
memberId = member.memberId,
generationId = group.generationId,
@@ -1023,7 +1077,7 @@ case class GroupConfig(groupMinSessionTimeoutMs: Int,
groupMaxSize: Int,
groupInitialRebalanceDelayMs: Int)
-case class JoinGroupResult(members: Map[String, Array[Byte]],
+case class JoinGroupResult(members: List[JoinGroupResponseMember],
memberId: String,
generationId: Int,
subProtocol: String,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 5cfd420..5a3841b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.common.OffsetAndMetadata
import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.utils.Time
import scala.collection.{Seq, immutable, mutable}
@@ -128,7 +129,12 @@ private object GroupMetadata {
group.protocol = Option(protocol)
group.leaderId = Option(leaderId)
group.currentStateTimestamp = currentStateTimestamp
- members.foreach(group.add(_, null))
+ members.foreach(member => {
+ group.add(member, null)
+ if (member.isStaticMember) {
+ group.addStaticMember(member.groupInstanceId, member.memberId)
+ }
+ })
group
}
}
@@ -184,6 +190,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
private var protocol: Option[String] = None
private val members = new mutable.HashMap[String, MemberMetadata]
+ // Static membership mapping [key: group.instance.id, value: member.id]
+ private val staticMembers = new mutable.HashMap[String, String]
private val pendingMembers = new mutable.HashSet[String]
private var numMembersAwaitingJoin = 0
private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
@@ -236,12 +244,56 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
leaderId = members.keys.headOption
}
+ /**
+ * [For static members only]: Replace the old member id with the new one,
+ * keep everything else unchanged and return the updated member.
+ */
+ def replaceGroupInstance(oldMemberId: String,
+ newMemberId: String,
+ groupInstanceId: Option[String]): MemberMetadata = {
+ if(groupInstanceId.isEmpty) {
+ throw new IllegalArgumentException(s"unexpected null group.instance.id in replaceGroupInstance")
+ }
+ val oldMember = members.remove(oldMemberId)
+ .getOrElse(throw new IllegalArgumentException(s"Cannot replace non-existing member id $oldMemberId"))
+
+ oldMember.memberId = newMemberId
+ members.put(newMemberId, oldMember)
+
+ if (isLeader(oldMemberId))
+ leaderId = Some(newMemberId)
+ addStaticMember(groupInstanceId, newMemberId)
+ oldMember
+ }
+
def isPendingMember(memberId: String): Boolean = pendingMembers.contains(memberId) && !has(memberId)
def addPendingMember(memberId: String) = pendingMembers.add(memberId)
def removePendingMember(memberId: String) = pendingMembers.remove(memberId)
+ def hasStaticMember(groupInstanceId: Option[String]) = groupInstanceId.isDefined && staticMembers.contains(groupInstanceId.get)
+
+ def getStaticMemberId(groupInstanceId: Option[String]) = {
+ if(groupInstanceId.isEmpty) {
+ throw new IllegalArgumentException(s"unexpected null group.instance.id in getStaticMemberId")
+ }
+ staticMembers(groupInstanceId.get)
+ }
+
+ def addStaticMember(groupInstanceId: Option[String], newMemberId: String) = {
+ if(groupInstanceId.isEmpty) {
+ throw new IllegalArgumentException(s"unexpected null group.instance.id in addStaticMember")
+ }
+ staticMembers.put(groupInstanceId.get, newMemberId)
+ }
+
+ def removeStaticMember(groupInstanceId: Option[String]) = {
+ if (groupInstanceId.isDefined) {
+ staticMembers.remove(groupInstanceId.get)
+ }
+ }
+
def currentState = state
def notYetRejoinedMembers = members.values.filter(!_.isAwaitingJoin).toList
@@ -250,6 +302,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def allMembers = members.keySet
+ def allStaticMembers = staticMembers.keySet
+
def numPending = pendingMembers.size
def allMemberMetadata = members.values.toList
@@ -337,10 +391,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
receivedTransactionalOffsetCommits = false
}
- def currentMemberMetadata: Map[String, Array[Byte]] = {
+ def currentMemberMetadata: List[JoinGroupResponseMember] = {
if (is(Dead) || is(PreparingRebalance))
throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
- members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol.get))}.toMap
+ members.map{ case (memberId, memberMetadata) => new JoinGroupResponseMember()
+ .setMemberId(memberId)
+ .setGroupInstanceId(memberMetadata.groupInstanceId.orNull)
+ .setMetadata(memberMetadata.metadata(protocol.get))
+ }.toList
}
def summary: GroupSummary = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index eecf713..fc7f4e8 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
-import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_2_IV0, KAFKA_2_3_IV0}
import kafka.common.{MessageFormatter, OffsetAndMetadata}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager
@@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
@@ -977,6 +977,7 @@ object GroupMetadataManager {
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
private val MEMBER_ID_KEY = "member_id"
+ private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
private val CLIENT_ID_KEY = "client_id"
private val CLIENT_HOST_KEY = "client_host"
private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
@@ -1003,6 +1004,16 @@ object GroupMetadataManager {
private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
+ private val MEMBER_METADATA_V3 = new Schema(
+ new Field(MEMBER_ID_KEY, STRING),
+ new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
+ new Field(CLIENT_ID_KEY, STRING),
+ new Field(CLIENT_HOST_KEY, STRING),
+ new Field(REBALANCE_TIMEOUT_KEY, INT32),
+ new Field(SESSION_TIMEOUT_KEY, INT32),
+ new Field(SUBSCRIPTION_KEY, BYTES),
+ new Field(ASSIGNMENT_KEY, BYTES))
+
private val PROTOCOL_TYPE_KEY = "protocol_type"
private val GENERATION_KEY = "generation"
private val PROTOCOL_KEY = "protocol"
@@ -1032,6 +1043,14 @@ object GroupMetadataManager {
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
+ private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
+ new Field(PROTOCOL_TYPE_KEY, STRING),
+ new Field(GENERATION_KEY, INT32),
+ new Field(PROTOCOL_KEY, NULLABLE_STRING),
+ new Field(LEADER_KEY, NULLABLE_STRING),
+ new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
+ new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
+
// map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_KEY_SCHEMA,
@@ -1049,7 +1068,8 @@ object GroupMetadataManager {
private val GROUP_VALUE_SCHEMAS = Map(
0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
- 2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
+ 2 -> GROUP_METADATA_VALUE_SCHEMA_V2,
+ 3 -> GROUP_METADATA_VALUE_SCHEMA_V3)
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
@@ -1172,8 +1192,10 @@ object GroupMetadataManager {
(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
else if (apiVersion < KAFKA_2_1_IV0)
(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
- else
+ else if (apiVersion < KAFKA_2_3_IV0)
(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
+ else
+ (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
}
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
@@ -1194,6 +1216,9 @@ object GroupMetadataManager {
if (version > 0)
memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
+ if (version >= 3)
+ memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull)
+
// The group is non-empty, so the current protocol must be defined
val protocol = groupMetadata.protocolOrNull
if (protocol == null)
@@ -1312,7 +1337,7 @@ object GroupMetadataManager {
val valueSchema = schemaForGroupValue(version)
val value = valueSchema.read(buffer)
- if (version >= 0 && version <= 2) {
+ if (version >= 0 && version <= 3) {
val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
@@ -1333,13 +1358,18 @@ object GroupMetadataManager {
val members = memberMetadataArray.map { memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+ val groupInstanceId =
+ if (version >= 3)
+ Some(memberMetadata.get(GROUP_INSTANCE_ID_KEY).asInstanceOf[String])
+ else
+ None
val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List((protocol, subscription)))
member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
member
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 1932f42..844d7e6 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -22,8 +22,8 @@ import java.util
import kafka.utils.nonthreadsafe
import org.apache.kafka.common.protocol.Errors
-
case class MemberSummary(memberId: String,
+ groupInstanceId: Option[String],
clientId: String,
clientHost: String,
metadata: Array[Byte],
@@ -54,8 +54,9 @@ private object MemberMetadata {
* and the group transitions to stable
*/
@nonthreadsafe
-private[group] class MemberMetadata(val memberId: String,
+private[group] class MemberMetadata(var memberId: String,
val groupId: String,
+ val groupInstanceId: Option[String],
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
@@ -69,6 +70,7 @@ private[group] class MemberMetadata(val memberId: String,
var latestHeartbeat: Long = -1
var isLeaving: Boolean = false
var isNew: Boolean = false
+ val isStaticMember: Boolean = groupInstanceId.isDefined
def isAwaitingJoin = awaitingJoinCallback != null
@@ -107,11 +109,11 @@ private[group] class MemberMetadata(val memberId: String,
}
def summary(protocol: String): MemberSummary = {
- MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+ MemberSummary(memberId, groupInstanceId, clientId, clientHost, metadata(protocol), assignment)
}
def summaryNoMetadata(): MemberSummary = {
- MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+ MemberSummary(memberId, groupInstanceId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
}
/**
@@ -129,6 +131,7 @@ private[group] class MemberMetadata(val memberId: String,
override def toString: String = {
"MemberMetadata(" +
s"memberId=$memberId, " +
+ s"groupInstanceId=$groupInstanceId, " +
s"clientId=$clientId, " +
s"clientHost=$clientHost, " +
s"sessionTimeoutMs=$sessionTimeoutMs, " +
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index aa368f8..b0482c8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Optional, Properties}
import kafka.admin.{AdminUtils, RackAwareMode}
-import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.controller.KafkaController
@@ -1298,12 +1298,6 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
- val members = joinResult.members map { case (memberId, metadataArray) =>
- new JoinGroupResponseData.JoinGroupResponseMember()
- .setMemberId(memberId)
- .setMetadata(metadataArray)
- }
-
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new JoinGroupResponse(
new JoinGroupResponseData()
@@ -1313,7 +1307,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setProtocolName(joinResult.subProtocol)
.setLeader(joinResult.leaderId)
.setMemberId(joinResult.memberId)
- .setMembers(members.toSeq.asJava)
+ .setMembers(joinResult.members.asJava)
)
trace("Sending join group response %s for correlation id %d to client %s."
@@ -1337,8 +1331,17 @@ class KafkaApis(val requestChannel: RequestChannel,
)
)
} else {
+ val encodedGroupInstanceId = joinGroupRequest.data().groupInstanceId
+ val groupInstanceId =
+ if (encodedGroupInstanceId == null ||
+ config.interBrokerProtocolVersion < KAFKA_2_3_IV0)
+ None
+ else
+ Some(encodedGroupInstanceId)
+
// Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
- val requireKnownMemberId = joinGroupRequest.version >= 4
+ // and groupInstanceId is configured to unknown.
+ val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty
// let the coordinator handle join-group
val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
@@ -1346,6 +1349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupCoordinator.handleJoinGroup(
joinGroupRequest.data().groupId,
joinGroupRequest.data().memberId,
+ groupInstanceId,
requireKnownMemberId,
request.header.clientId,
request.session.clientAddress.toString,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8f127bc..04f4f3b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -151,7 +151,7 @@ object Defaults {
/** ********* Group coordinator configuration ***********/
val GroupMinSessionTimeoutMs = 6000
- val GroupMaxSessionTimeoutMs = 300000
+ val GroupMaxSessionTimeoutMs = 1800000
val GroupInitialRebalanceDelayMs = 3000
val GroupMaxSize: Int = Int.MaxValue
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 23a0ad0..1e7c8a8 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -327,6 +327,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setGroupId(group)
.setSessionTimeoutMs(10000)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setGroupInstanceId(null)
.setProtocolType("consumer")
.setProtocols(protocolSet)
.setRebalanceTimeoutMs(60000)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index ba179a5..25f2ba4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -159,7 +159,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
callback
}
override def runWithCallback(member: GroupMember, responseCallback: JoinGroupCallback): Unit = {
- groupCoordinator.handleJoinGroup(member.groupId, member.memberId, requireKnownMemberId = false, "clientId", "clientHost",
+ groupCoordinator.handleJoinGroup(member.groupId, member.memberId, None, requireKnownMemberId = false, "clientId", "clientHost",
DefaultRebalanceTimeout, DefaultSessionTimeout,
protocolType, protocols, responseCallback)
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index cf76375..478f027 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -72,8 +72,12 @@ class GroupCoordinatorTest extends JUnitSuite {
private val groupId = "groupId"
private val protocolType = "consumer"
private val memberId = "memberId"
+ private val groupInstanceId = Some("groupInstanceId")
+ private val leaderInstanceId = Some("leader")
+ private val followerInstanceId = Some("follower")
private val metadata = Array[Byte]()
private val protocols = List(("range", metadata))
+ private val protocolSuperset = List(("range", metadata), ("roundrobin", metadata))
private var groupPartitionId: Int = -1
// we use this string value since its hashcode % #.partitions is different
@@ -127,9 +131,14 @@ class GroupCoordinatorTest extends JUnitSuite {
groupCoordinator.groupManager.addLoadingPartition(otherGroupPartitionId)
assertTrue(groupCoordinator.groupManager.isGroupLoading(otherGroupId))
- // JoinGroup
+ // Dynamic Member JoinGroup
var joinGroupResponse: Option[JoinGroupResult] = None
- groupCoordinator.handleJoinGroup(otherGroupId, memberId, true, "clientId", "clientHost", 60000, 10000, "consumer",
+ groupCoordinator.handleJoinGroup(otherGroupId, memberId, None, true, "clientId", "clientHost", 60000, 10000, "consumer",
+ List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)})
+ assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error))
+
+ // Static Member JoinGroup
+ groupCoordinator.handleJoinGroup(otherGroupId, memberId, Some("groupInstanceId"), false, "clientId", "clientHost", 60000, 10000, "consumer",
List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)})
assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error))
@@ -188,9 +197,12 @@ class GroupCoordinatorTest extends JUnitSuite {
def testJoinGroupWrongCoordinator() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(otherGroupId, memberId, protocolType, protocols)
- val joinGroupError = joinGroupResult.error
- assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
+ var joinGroupResult = dynamicJoinGroup(otherGroupId, memberId, protocolType, protocols)
+ assertEquals(Errors.NOT_COORDINATOR, joinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ joinGroupResult = staticJoinGroup(otherGroupId, memberId, groupInstanceId, protocolType, protocols)
+ assertEquals(Errors.NOT_COORDINATOR, joinGroupResult.error)
}
@Test
@@ -199,7 +211,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val rebalanceTimeout = GroupInitialRebalanceDelay * 2
for (i <- 1.to(GroupMaxSize)) {
- futures += sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+ futures += sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
if (i != 1)
timer.advanceClock(GroupInitialRebalanceDelay)
EasyMock.reset(replicaManager)
@@ -211,7 +223,7 @@ class GroupCoordinatorTest extends JUnitSuite {
}
// Should receive an error since the group is full
- val errorFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+ val errorFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
assertEquals(Errors.GROUP_MAX_SIZE_REACHED, await(errorFuture, 1).error)
}
@@ -219,7 +231,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testJoinGroupSessionTimeoutTooSmall() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
}
@@ -228,16 +240,19 @@ class GroupCoordinatorTest extends JUnitSuite {
def testJoinGroupSessionTimeoutTooLarge() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
}
@Test
def testJoinGroupUnknownConsumerNewGroup() {
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
- val joinGroupError = joinGroupResult.error
- assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupError)
+ var joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ joinGroupResult = staticJoinGroup(groupId, memberId, groupInstanceId, protocolType, protocols)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
}
@Test
@@ -245,7 +260,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val groupId = ""
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error)
}
@@ -253,7 +268,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testValidJoinGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
}
@@ -263,7 +278,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
assertEquals(Errors.NONE, joinGroupResult.error)
EasyMock.reset(replicaManager)
@@ -275,7 +290,11 @@ class GroupCoordinatorTest extends JUnitSuite {
def testJoinGroupWithEmptyProtocolType() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, "", protocols)
+ var joinGroupResult = dynamicJoinGroup(groupId, memberId, "", protocols)
+ assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ joinGroupResult = staticJoinGroup(groupId, memberId, groupInstanceId, "", protocols)
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
}
@@ -283,7 +302,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testJoinGroupWithEmptyGroupProtocol() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, List())
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, List())
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
}
@@ -297,7 +316,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
val rebalanceTimeout = GroupCoordinator.NewMemberJoinTimeoutMs * 2
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
sessionTimeout, rebalanceTimeout)
val firstMemberId = firstJoinResult.memberId
assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -309,8 +328,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(0, group.allMemberMetadata.count(_.isNew))
EasyMock.reset(replicaManager)
-
- val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, sessionTimeout, rebalanceTimeout)
+ val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, None, sessionTimeout, rebalanceTimeout)
assertFalse(responseFuture.isCompleted)
assertEquals(2, group.allMembers.size)
@@ -338,7 +356,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, List(("range", metadata)))
EasyMock.reset(replicaManager)
- val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata)))
+ val otherJoinGroupResult = dynamicJoinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata)))
timer.advanceClock(GroupInitialRebalanceDelay + 1)
val joinGroupResult = await(joinGroupFuture, 1)
@@ -351,7 +369,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = "memberId"
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
assertEquals(Errors.NONE, joinGroupResult.error)
EasyMock.reset(replicaManager)
@@ -360,13 +378,13 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
- def testJoinGroupUnknownConsumerDeadGroup() {
+ def testJoinGroupUnknownConsumerNewDeadGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val deadGroupId = "deadGroupId"
groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
- val joinGroupResult = joinGroup(deadGroupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, protocolType, protocols)
assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
}
@@ -392,6 +410,406 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
+ def staticMemberJoinAsFirstMember() {
+ val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols)
+ assertEquals(Errors.NONE, joinGroupResult.error)
+ }
+
+ @Test
+ def staticMemberReJoinWithExplicitUnknownMemberId() {
+ var joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols)
+ assertEquals(Errors.NONE, joinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ val unknownMemberId = "unknown_member"
+ joinGroupResult = staticJoinGroup(groupId, unknownMemberId, groupInstanceId, protocolType, protocols)
+ assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
+ }
+
+ @Test
+ def staticMemberRejoinWithKnownMemberId() {
+ var joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols)
+ assertEquals(Errors.NONE, joinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ val assignedMemberId = joinGroupResult.memberId
+ // The second join group should return immediately since we are using the same metadata during CompletingRebalance.
+ val rejoinResponseFuture = sendJoinGroup(groupId, assignedMemberId, protocolType, protocols, groupInstanceId)
+ timer.advanceClock(1)
+ joinGroupResult = Await.result(rejoinResponseFuture, Duration(1, TimeUnit.MILLISECONDS))
+ assertEquals(Errors.NONE, joinGroupResult.error)
+ assertTrue(getGroup(groupId).is(CompletingRebalance))
+
+ EasyMock.reset(replicaManager)
+ val syncGroupFuture = sendSyncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+ timer.advanceClock(1)
+ val syncGroupResult = Await.result(syncGroupFuture, Duration(1, TimeUnit.MILLISECONDS))
+ assertEquals(Errors.NONE, syncGroupResult._2)
+ assertTrue(getGroup(groupId).is(Stable))
+ }
+
+ @Test
+ def staticMemberRejoinWithLeaderIdAndUnknownMemberId() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ // A static leader rejoin with unknown id will not trigger rebalance.
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+ checkJoinGroupResult(joinGroupResult,
+ Errors.NONE,
+ rebalanceResult.generation, // The group should be at the same generation
+ Set(leaderInstanceId, followerInstanceId),
+ groupId,
+ Stable)
+
+ EasyMock.reset(replicaManager)
+ val oldLeaderJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+ assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderJoinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ assertNotEquals(rebalanceResult.leaderId, joinGroupResult.leaderId)
+ val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, oldLeaderSyncGroupResult._2)
+
+ EasyMock.reset(replicaManager)
+ val newLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, joinGroupResult.leaderId, Map.empty)
+ assertEquals(Errors.NONE, newLeaderSyncGroupResult._2)
+ assertEquals(rebalanceResult.leaderAssignment, newLeaderSyncGroupResult._1)
+ }
+
+ @Test
+ def staticMemberRejoinWithLeaderIdAndKnownMemberId() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ // A static leader with known id rejoin will trigger rebalance.
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = DefaultSessionTimeout + 1)
+ // Timeout follower in the meantime.
+ assertFalse(getGroup(groupId).hasStaticMember(followerInstanceId))
+ checkJoinGroupResult(joinGroupResult,
+ Errors.NONE,
+ rebalanceResult.generation + 1, // The group has promoted to the new generation.
+ Set(leaderInstanceId),
+ groupId,
+ CompletingRebalance,
+ rebalanceResult.leaderId,
+ rebalanceResult.leaderId)
+ }
+
+ @Test
+ def staticMemberRejoinWithLeaderIdAndUnexpectedDeadGroup() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ getGroup(groupId).transitionTo(Dead)
+
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+ }
+
+ @Test
+ def staticMemberRejoinWithLeaderIdAndUnexpectedEmptyGroup() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ getGroup(groupId).transitionTo(PreparingRebalance)
+ getGroup(groupId).transitionTo(Empty)
+
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocols, clockAdvance = 1)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+ }
+
+ @Test
+ def staticMemberRejoinWithFollowerIdAndChangeOfProtocol() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ // A static follower rejoin with changed protocol will trigger rebalance.
+ EasyMock.reset(replicaManager)
+ val newProtocols = List(("roundrobin", metadata))
+ // Timeout old leader in the meantime.
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = DefaultSessionTimeout + 1)
+
+ assertFalse(getGroup(groupId).hasStaticMember(leaderInstanceId))
+ checkJoinGroupResult(joinGroupResult,
+ Errors.NONE,
+ rebalanceResult.generation + 1, // The group has promoted to the new generation, and leader has changed because old one times out.
+ Set(followerInstanceId),
+ groupId,
+ CompletingRebalance,
+ rebalanceResult.followerId,
+ rebalanceResult.followerId)
+ }
+
+ @Test
+ def staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocol() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance.
+ EasyMock.reset(replicaManager)
+ val newProtocols = List(("roundrobin", metadata))
+ // Timeout old leader in the meantime.
+ val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+
+ checkJoinGroupResult(joinGroupResult,
+ Errors.NONE,
+ rebalanceResult.generation,
+ Set.empty,
+ groupId,
+ Stable)
+
+ EasyMock.reset(replicaManager)
+ // Join with old member id will fail because the member id is updated
+ assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId)
+ val oldFollowerJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, newProtocols, clockAdvance = 1)
+ assertEquals(Errors.FENCED_INSTANCE_ID, oldFollowerJoinGroupResult.error)
+
+ EasyMock.reset(replicaManager)
+ // Sync with old member id will fail because the member id is updated
+ val syncGroupWithOldMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, rebalanceResult.followerId)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupWithOldMemberIdResult._2)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupWithNewMemberIdResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId)
+ assertEquals(Errors.NONE, syncGroupWithNewMemberIdResult._2)
+ assertEquals(rebalanceResult.followerAssignment, syncGroupWithNewMemberIdResult._1)
+ }
+
+ @Test
+ def staticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeofProtocol() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ // A static leader rejoin with known member id will trigger rebalance.
+ EasyMock.reset(replicaManager)
+ val leaderRejoinGroupFuture = sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, protocolSuperset, leaderInstanceId)
+ // Rebalance complete immediately after follower rejoin.
+ EasyMock.reset(replicaManager)
+ val followerRejoinWithFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, followerInstanceId)
+
+ timer.advanceClock(1)
+
+ // Leader should get the same assignment as last round.
+ checkJoinGroupResult(await(leaderRejoinGroupFuture, 1),
+ Errors.NONE,
+ rebalanceResult.generation + 1, // The group has promoted to the new generation.
+ Set(leaderInstanceId, followerInstanceId),
+ groupId,
+ CompletingRebalance,
+ rebalanceResult.leaderId,
+ rebalanceResult.leaderId)
+
+ checkJoinGroupResult(await(followerRejoinWithFuture, 1),
+ Errors.NONE,
+ rebalanceResult.generation + 1, // The group has promoted to the new generation.
+ Set.empty,
+ groupId,
+ CompletingRebalance,
+ rebalanceResult.leaderId,
+ rebalanceResult.followerId)
+
+ EasyMock.reset(replicaManager)
+ // The follower protocol changed from protocolSuperset to general protocols.
+ val followerRejoinWithProtocolChangeFuture = sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, followerInstanceId)
+ // The group will transit to PreparingRebalance due to protocol change from follower.
+ assertTrue(getGroup(groupId).is(PreparingRebalance))
+
+ timer.advanceClock(DefaultRebalanceTimeout + 1)
+ checkJoinGroupResult(await(followerRejoinWithProtocolChangeFuture, 1),
+ Errors.NONE,
+ rebalanceResult.generation + 2, // The group has promoted to the new generation.
+ Set(followerInstanceId),
+ groupId,
+ CompletingRebalance,
+ rebalanceResult.followerId,
+ rebalanceResult.followerId)
+ }
+
+ @Test
+ def staticMemberRejoinAsFollowerWithUnknownMemberId() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ // A static follower rejoin with no protocol change will not trigger rebalance.
+ val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+ // Old leader shouldn't be timed out.
+ assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId))
+ checkJoinGroupResult(joinGroupResult,
+ Errors.NONE,
+ rebalanceResult.generation, // The group has no change.
+ Set.empty,
+ groupId,
+ Stable)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId)
+ assertEquals(Errors.NONE, syncGroupResult._2)
+ assertEquals(rebalanceResult.followerAssignment, syncGroupResult._1)
+ }
+
+ @Test
+ def staticMemberRejoinAsFollowerWithKnownMemberIdAndNoProtocolChange() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ // A static follower rejoin with no protocol change will not trigger rebalance.
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+ // Old leader shouldn't be timed out.
+ assertTrue(getGroup(groupId).hasStaticMember(leaderInstanceId))
+ checkJoinGroupResult(joinGroupResult,
+ Errors.NONE,
+ rebalanceResult.generation, // The group has no change.
+ Set.empty,
+ groupId,
+ Stable,
+ rebalanceResult.leaderId,
+ rebalanceResult.followerId)
+ }
+
+ @Test
+ def staticMemberRejoinAsFollowerWithMismatchedMemberId() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.followerId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+ assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
+ }
+
+ @Test
+ def staticMemberRejoinAsLeaderWithMismatchedMemberId() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+
+ assertEquals(Errors.FENCED_INSTANCE_ID, joinGroupResult.error)
+ }
+
+ @Test
+ def staticMemberJoinWithUnknownInstanceIdAndKnownMemberId() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, Some("unknown_instance"), protocolType, protocolSuperset, clockAdvance = 1)
+
+ assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+ }
+
+ @Test
+ def staticMemberJoinWithIllegalStateAsPendingMember() {
+ val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+ val group = groupCoordinator.groupManager.getGroup(groupId).get
+ group.addPendingMember(rebalanceResult.followerId)
+ group.remove(rebalanceResult.followerId)
+ EasyMock.reset(replicaManager)
+
+ // Illegal state exception shall trigger since follower id resides in pending member bucket.
+ val expectedException = intercept[IllegalStateException] {
+ staticJoinGroup(groupId, rebalanceResult.followerId, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+ }
+
+ val message = expectedException.getMessage
+ assertTrue(message.contains(rebalanceResult.followerId))
+ assertTrue(message.contains(followerInstanceId.get))
+ }
+
+ @Test
+ def staticMemberReJoinWithIllegalArgumentAsMissingOldMember() {
+ val _ = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
+ val group = groupCoordinator.groupManager.getGroup(groupId).get
+ val invalidMemberId = "invalid_member_id"
+ group.addStaticMember(followerInstanceId, invalidMemberId)
+ EasyMock.reset(replicaManager)
+
+ // Illegal state exception shall trigger since follower corresponding id is not defined in member list.
+ val expectedException = intercept[IllegalArgumentException] {
+ staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1)
+ }
+
+ val message = expectedException.getMessage
+ assertTrue(message.contains(invalidMemberId))
+ }
+
+ private class RebalanceResult(val generation: Int,
+ val leaderId: String,
+ val leaderAssignment: Array[Byte],
+ val followerId: String,
+ val followerAssignment: Array[Byte])
+ /**
+ * Generate static member rebalance results, including:
+ * - generation
+ * - leader id
+ * - leader assignment
+ * - follower id
+ * - follower assignment
+ */
+ private def staticMembersJoinAndRebalance(leaderInstanceId: Option[String],
+ followerInstanceId: Option[String]): RebalanceResult = {
+ EasyMock.reset(replicaManager)
+ val leaderResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, leaderInstanceId)
+
+ EasyMock.reset(replicaManager)
+ val followerResponseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, followerInstanceId)
+ // The goal for two timer advance is to let first group initial join complete and set newMemberAdded flag to false. Next advance is
+ // to trigger the rebalance as needed for follower delayed join. One large time advance won't help because we could only populate one
+ // delayed join from purgatory and the new delayed op is created at that time and never be triggered.
+ timer.advanceClock(GroupInitialRebalanceDelay + 1)
+ timer.advanceClock(DefaultRebalanceTimeout + 1)
+ val newGeneration = 1
+
+ val leaderJoinGroupResult = await(leaderResponseFuture, 1)
+ assertEquals(Errors.NONE, leaderJoinGroupResult.error)
+ assertEquals(newGeneration, leaderJoinGroupResult.generationId)
+
+ val followerJoinGroupResult = await(followerResponseFuture, 1)
+ assertEquals(Errors.NONE, followerJoinGroupResult.error)
+ assertEquals(newGeneration, followerJoinGroupResult.generationId)
+
+ EasyMock.reset(replicaManager)
+ val leaderId = leaderJoinGroupResult.memberId
+ val leaderSyncGroupResult = syncGroupLeader(groupId, leaderJoinGroupResult.generationId, leaderId, Map(leaderId -> Array[Byte]()))
+ assertEquals(Errors.NONE, leaderSyncGroupResult._2)
+ assertTrue(getGroup(groupId).is(Stable))
+
+ EasyMock.reset(replicaManager)
+ val followerId = followerJoinGroupResult.memberId
+ val follwerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId)
+ assertEquals(Errors.NONE, follwerSyncGroupResult._2)
+ assertTrue(getGroup(groupId).is(Stable))
+
+ new RebalanceResult(newGeneration,
+ leaderId,
+ leaderSyncGroupResult._1,
+ followerId,
+ follwerSyncGroupResult._1)
+ }
+
+ private def checkJoinGroupResult(joinGroupResult: JoinGroupResult,
+ expectedError: Errors,
+ expectedGeneration: Int,
+ expectedGroupInstanceIds: Set[Option[String]],
+ groupId: String,
+ expectedGroupState: GroupState,
+ expectedLeaderId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ expectedMemberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ assertEquals(Errors.NONE, joinGroupResult.error)
+ assertEquals(expectedGeneration, joinGroupResult.generationId)
+ assertEquals(expectedGroupInstanceIds.size, joinGroupResult.members.size)
+ val resultedGroupInstanceIds = joinGroupResult.members.map(member => Some(member.groupInstanceId())).toSet
+ assertEquals(expectedGroupInstanceIds, resultedGroupInstanceIds)
+ assertTrue(getGroup(groupId).is(expectedGroupState))
+
+ if (!expectedLeaderId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)) {
+ assertEquals(expectedLeaderId, joinGroupResult.leaderId)
+ }
+ if (!expectedMemberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)) {
+ assertEquals(expectedMemberId, joinGroupResult.memberId)
+ }
+ }
+
+ @Test
def testHeartbeatWrongCoordinator() {
val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
assertEquals(Errors.NOT_COORDINATOR, heartbeatResult)
@@ -409,7 +827,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = "memberId"
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -428,7 +846,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testHeartbeatRebalanceInProgress() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -442,7 +860,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testHeartbeatIllegalGeneration() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -461,7 +879,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testValidHeartbeat() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedConsumerId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
@@ -481,7 +899,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testSessionTimeout() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedConsumerId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
@@ -508,7 +926,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val sessionTimeout = 1000
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols,
rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
val assignedConsumerId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
@@ -539,7 +957,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val tp = new TopicPartition("topic", 0)
val offset = offsetAndMetadata(0)
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols,
rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
val assignedConsumerId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
@@ -566,7 +984,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testSessionTimeoutDuringRebalance() {
// create a group with a single member
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
rebalanceTimeout = 2000, sessionTimeout = 1000)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
@@ -602,7 +1020,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testRebalanceCompletesBeforeMemberJoins() {
// create a group with a single member
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
rebalanceTimeout = 1200, sessionTimeout = 1000)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
@@ -657,7 +1075,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testSyncGroupEmptyAssignment() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedConsumerId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
@@ -694,7 +1112,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testSyncGroupFromUnknownMember() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedConsumerId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
assertEquals(Errors.NONE, joinGroupResult.error)
@@ -714,7 +1132,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testSyncGroupFromIllegalGeneration() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedConsumerId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
assertEquals(Errors.NONE, joinGroupResult.error)
@@ -731,7 +1149,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// 1. join and sync with a single member (because we can't immediately join with two members)
// 2. join and sync with the first member and a new member
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -768,7 +1186,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testJoinGroupFromUnchangedLeaderShouldRebalance() {
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -796,7 +1214,7 @@ class GroupCoordinatorTest extends JUnitSuite {
*/
@Test
def testSecondMemberPartiallyJoinAndTimeout() {
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -852,7 +1270,7 @@ class GroupCoordinatorTest extends JUnitSuite {
*/
private def setupGroupWithPendingMember(): JoinGroupResult = {
// add the first member
- val joinResult1 = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val joinResult1 = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
assertGroupState(groupState = CompletingRebalance)
// now the group is stable, with the one member that joined above
@@ -971,7 +1389,7 @@ class GroupCoordinatorTest extends JUnitSuite {
sessionTimeout: Int = DefaultSessionTimeout,
rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
val requireKnownMemberId = true
- val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
+ val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
}
@@ -981,7 +1399,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// 1. join and sync with a single member (because we can't immediately join with two members)
// 2. join and sync with the first member and a new member
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -1025,7 +1443,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// 1. join and sync with a single member (because we can't immediately join with two members)
// 2. join and sync with the first member and a new member
- val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
assertEquals(firstMemberId, firstJoinResult.leaderId)
@@ -1074,7 +1492,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// 1. join and sync with a single member (because we can't immediately join with two members)
// 2. join and sync with the first member and a new member
- val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val firstMemberId = joinGroupResult.memberId
val firstGenerationId = joinGroupResult.generationId
assertEquals(firstMemberId, joinGroupResult.leaderId)
@@ -1147,7 +1565,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// A group member joins
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -1495,7 +1913,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val tp = new TopicPartition("topic", 0)
val offset = offsetAndMetadata(0)
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
@@ -1509,7 +1927,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
// First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
- val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val assignedConsumerId = joinGroupResult.memberId
val initialGenerationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
@@ -1527,7 +1945,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testGenerationIdIncrementsOnRebalance() {
- val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val initialGenerationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
val memberId = joinGroupResult.memberId
@@ -1569,7 +1987,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val otherMemberId = "consumerId"
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -1582,7 +2000,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testValidLeaveGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -1595,7 +2013,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testListGroupsIncludesStableGroups() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
assertEquals(Errors.NONE, joinGroupResult.error)
@@ -1614,7 +2032,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testListGroupsIncludesRebalancingGroups() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
assertEquals(Errors.NONE, joinGroupResult.error)
val (error, groups) = groupCoordinator.handleListGroups()
@@ -1641,7 +2059,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testDescribeGroupStable() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
@@ -1664,7 +2082,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testDescribeGroupRebalancing() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -1682,7 +2100,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testDeleteNonEmptyGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- joinGroup(groupId, memberId, protocolType, protocols)
+ dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val result = groupCoordinator.handleDeleteGroups(Set(groupId))
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
@@ -1704,7 +2122,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testDeleteEmptyGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
EasyMock.reset(replicaManager)
val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId)
@@ -1727,7 +2145,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testDeleteEmptyGroupWithStoredOffsets() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+ val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
@@ -1836,6 +2254,11 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NONE, thirdResult.error)
}
+ private def getGroup(groupId: String): GroupMetadata = {
+ val groupOpt = groupCoordinator.groupManager.getGroup(groupId)
+ assertTrue(groupOpt.isDefined)
+ groupOpt.get
+ }
private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
val responsePromise = Promise[JoinGroupResult]
val responseFuture = responsePromise.future
@@ -1869,6 +2292,7 @@ class GroupCoordinatorTest extends JUnitSuite {
memberId: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
+ groupInstanceId: Option[String] = None,
sessionTimeout: Int = DefaultSessionTimeout,
rebalanceTimeout: Int = DefaultRebalanceTimeout,
requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
@@ -1876,8 +2300,8 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.replay(replicaManager)
- groupCoordinator.handleJoinGroup(groupId, memberId, requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout,
- protocolType, protocols, responseCallback)
+ groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId,
+ requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
responseFuture
}
@@ -1921,14 +2345,14 @@ class GroupCoordinatorTest extends JUnitSuite {
responseFuture
}
- private def joinGroup(groupId: String,
- memberId: String,
- protocolType: String,
- protocols: List[(String, Array[Byte])],
- sessionTimeout: Int = DefaultSessionTimeout,
- rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
+ private def dynamicJoinGroup(groupId: String,
+ memberId: String,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ sessionTimeout: Int = DefaultSessionTimeout,
+ rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
val requireKnownMemberId = true
- var responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
+ var responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
// Since member id is required, we need another bounce to get the successful join group result.
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID && requireKnownMemberId) {
@@ -1938,13 +2362,27 @@ class GroupCoordinatorTest extends JUnitSuite {
return joinGroupResult
}
EasyMock.reset(replicaManager)
- responseFuture = sendJoinGroup(groupId, joinGroupResult.memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
+ responseFuture = sendJoinGroup(groupId, joinGroupResult.memberId, protocolType, protocols, None, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
}
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
}
+ private def staticJoinGroup(groupId: String,
+ memberId: String,
+ groupInstanceId: Option[String],
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ clockAdvance: Int = GroupInitialRebalanceDelay + 1,
+ sessionTimeout: Int = DefaultSessionTimeout,
+ rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
+ val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, groupInstanceId, sessionTimeout, rebalanceTimeout)
+
+ timer.advanceClock(clockAdvance)
+ // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+ Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+ }
private def syncGroupFollower(groupId: String,
generationId: Int,
@@ -2060,5 +2498,4 @@ class GroupCoordinatorTest extends JUnitSuite {
private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
OffsetAndMetadata(offset, "", timer.time.milliseconds())
}
-
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 0287888..6c83da3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -59,6 +59,7 @@ class GroupMetadataManagerTest {
var defaultOffsetRetentionMs = Long.MaxValue
val groupId = "foo"
+ val groupInstanceId = Some("bar")
val groupPartitionId = 0
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val protocolType = "protocolType"
@@ -775,6 +776,53 @@ class GroupMetadataManagerTest {
}
@Test
+ def testloadGroupWithStaticMember() {
+ val generation = 27
+ val protocolType = "consumer"
+ val staticMemberId = "staticMemberId"
+ val dynamicMemberId = "dynamicMemberId"
+
+ val staticMember = new MemberMetadata(staticMemberId, groupId, groupInstanceId, "", "", rebalanceTimeout, sessionTimeout,
+ protocolType, List(("protocol", Array[Byte]())))
+
+ val dynamicMember = new MemberMetadata(dynamicMemberId, groupId, None, "", "", rebalanceTimeout, sessionTimeout,
+ protocolType, List(("protocol", Array[Byte]())))
+
+ val members = Seq(staticMember, dynamicMember)
+
+ val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, members, time)
+
+ assertTrue(group.is(Empty))
+ assertEquals(generation, group.generationId)
+ assertEquals(Some(protocolType), group.protocolType)
+ assertTrue(group.has(staticMemberId))
+ assertTrue(group.has(dynamicMemberId))
+ assertTrue(group.hasStaticMember(groupInstanceId))
+ assertEquals(staticMemberId, group.getStaticMemberId(groupInstanceId))
+ }
+
+ @Test
+ def testReadFromOldGroupMetadata() {
+ val generation = 1
+ val protocol = "range"
+ val memberId = "memberId"
+ val oldApiVersions = Array(KAFKA_0_9_0, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0)
+
+ for (apiVersion <- oldApiVersions) {
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+ val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+ assertEquals(groupId, deserializedGroupMetadata.groupId)
+ assertEquals(generation, deserializedGroupMetadata.generationId)
+ assertEquals(protocolType, deserializedGroupMetadata.protocolType.get)
+ assertEquals(protocol, deserializedGroupMetadata.protocolOrNull)
+ assertEquals(1, deserializedGroupMetadata.allMembers.size)
+ assertTrue(deserializedGroupMetadata.allMembers.contains(memberId))
+ assertTrue(deserializedGroupMetadata.allStaticMembers.isEmpty)
+ }
+ }
+
+ @Test
def testStoreEmptyGroup() {
val generation = 27
val protocolType = "consumer"
@@ -875,7 +923,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
@@ -904,7 +952,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
@@ -1403,7 +1451,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addGroup(group)
val subscription = new Subscription(List(topic).asJava)
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
@@ -1896,7 +1944,7 @@ class GroupMetadataManagerTest {
assignmentSize: Int = 0,
apiVersion: ApiVersion = ApiVersion.latestVersion): SimpleRecord = {
val memberProtocols = List((protocol, Array.emptyByteArray))
- val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId,
if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None, Seq(member), time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 3108b15..11af899 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -30,6 +30,7 @@ import org.scalatest.junit.JUnitSuite
class GroupMetadataTest extends JUnitSuite {
private val protocolType = "consumer"
private val groupId = "groupId"
+ private val groupInstanceId = Some("groupInstanceId")
private val clientId = "clientId"
private val clientHost = "clientHost"
private val rebalanceTimeoutMs = 60000
@@ -191,14 +192,14 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSelectProtocol() {
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(member)
assertEquals("range", group.selectProtocol)
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(otherMember)
@@ -206,7 +207,7 @@ class GroupMetadataTest extends JUnitSuite {
assertTrue(Set("range", "roundrobin")(group.selectProtocol))
val lastMemberId = "lastMemberId"
- val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+ val lastMember = new MemberMetadata(lastMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(lastMember)
@@ -223,11 +224,11 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSelectProtocolChoosesCompatibleProtocol() {
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(member)
@@ -241,7 +242,7 @@ class GroupMetadataTest extends JUnitSuite {
assertTrue(group.supportsProtocols(protocolType, Set("roundrobin", "range")))
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(member)
@@ -251,7 +252,7 @@ class GroupMetadataTest extends JUnitSuite {
assertFalse(group.supportsProtocols(protocolType, Set("foo", "bar")))
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(otherMember)
@@ -264,7 +265,7 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testInitNextGeneration() {
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, List(("roundrobin", Array.empty[Byte])))
group.transitionTo(PreparingRebalance)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala
index 48c9270..986d015 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/MemberMetadataTest.scala
@@ -24,6 +24,7 @@ import org.scalatest.junit.JUnitSuite
class MemberMetadataTest extends JUnitSuite {
val groupId = "groupId"
+ val groupInstanceId = Some("groupInstanceId")
val clientId = "clientId"
val clientHost = "clientHost"
val memberId = "memberId"
@@ -36,7 +37,7 @@ class MemberMetadataTest extends JUnitSuite {
def testMatchesSupportedProtocols() {
val protocols = List(("range", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, protocols)
assertTrue(member.matches(protocols))
assertFalse(member.matches(List(("range", Array[Byte](0)))))
@@ -48,7 +49,7 @@ class MemberMetadataTest extends JUnitSuite {
def testVoteForPreferredProtocol() {
val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, protocols)
assertEquals("range", member.vote(Set("range", "roundrobin")))
assertEquals("roundrobin", member.vote(Set("blah", "roundrobin")))
@@ -58,7 +59,7 @@ class MemberMetadataTest extends JUnitSuite {
def testMetadata() {
val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, protocols)
assertTrue(Arrays.equals(Array[Byte](0), member.metadata("range")))
assertTrue(Arrays.equals(Array[Byte](1), member.metadata("roundrobin")))
@@ -68,7 +69,7 @@ class MemberMetadataTest extends JUnitSuite {
def testMetadataRaisesOnUnsupportedProtocol() {
val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, protocols)
member.metadata("blah")
fail()
@@ -78,11 +79,19 @@ class MemberMetadataTest extends JUnitSuite {
def testVoteRaisesOnNoSupportedProtocols() {
val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
- val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, protocols)
member.vote(Set("blah"))
fail()
}
+ @Test
+ def testHasValidGroupInstanceId() {
+ val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
+ val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
+ protocolType, protocols)
+ assertTrue(member.isStaticMember)
+ assertEquals(groupInstanceId, member.groupInstanceId)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ae74fa1..d9c8bb5 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -282,6 +282,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setGroupId("test-join-group")
.setSessionTimeoutMs(200)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setGroupInstanceId(null)
.setProtocolType("consumer")
.setProtocols(
new JoinGroupRequestData.JoinGroupRequestProtocolSet(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1bd09e4..cb4629f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -671,6 +671,7 @@ public class StreamThread extends Thread {
originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}
+
final Consumer<byte[], byte[]> consumer = clientSupplier.getConsumer(consumerConfigs);
taskManager.setConsumer(consumer);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index a598400..6a724ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -157,7 +157,7 @@ public class FineGrainedAutoResetIntegrationTest {
}
@Test
- public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception {
+ public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception {
streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage);
@@ -165,13 +165,13 @@ public class FineGrainedAutoResetIntegrationTest {
}
@Test
- public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception {
+ public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception {
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues);
}
@Test
- public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception {
+ public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception {
commitInvalidOffsets();
final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage);
@@ -230,7 +230,7 @@ public class FineGrainedAutoResetIntegrationTest {
private void commitInvalidOffsets() {
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
- streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
+ "commit_invalid_offset_app", // Having a separate application id to avoid waiting for last test poll interval timeout.
StringDeserializer.class,
StringDeserializer.class));
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 532c59f..cec8ea0 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -33,8 +33,9 @@ class ConsumerState:
class ConsumerEventHandler(object):
- def __init__(self, node, verify_offsets):
+ def __init__(self, node, verify_offsets, idx):
self.node = node
+ self.idx = idx
self.state = ConsumerState.Dead
self.revoked_count = 0
self.assigned_count = 0
@@ -165,7 +166,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
}
def __init__(self, context, num_nodes, kafka, topic, group_id,
- max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
+ static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
@@ -179,6 +180,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
self.topic = topic
self.group_id = group_id
self.reset_policy = reset_policy
+ self.static_membership = static_membership
self.max_messages = max_messages
self.session_timeout_sec = session_timeout_sec
self.enable_autocommit = enable_autocommit
@@ -202,7 +204,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def _worker(self, idx, node):
with self.lock:
if node not in self.event_handlers:
- self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets)
+ self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx)
handler = self.event_handlers[node]
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
@@ -220,6 +222,10 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
self.logger.info(self.prop_file)
node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file)
self.security_config.setup_node(node)
+ # apply group.instance.id to the node for static membership validation
+ node.group_instance_id = None
+ if self.static_membership:
+ node.group_instance_id = self.group_id + "-instance-" + str(idx)
cmd = self.start_cmd(node)
self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
@@ -286,8 +292,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
cmd += self.impl.exec_cmd(node)
if self.on_record_consumed:
cmd += " --verbose"
- cmd += " --reset-policy %s --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
- (self.reset_policy, self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
+ cmd += " --reset-policy %s --group-id %s --topic %s --group-instance-id %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
+ (self.reset_policy, self.group_id, self.topic, node.group_instance_id, self.kafka.bootstrap_servers(self.security_config.security_protocol),
self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")
if self.max_messages > 0:
@@ -365,6 +371,11 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
with self.lock:
return max(handler.assigned_count for handler in self.event_handlers.itervalues())
+ def num_revokes_for_alive(self, keep_alive=1):
+ with self.lock:
+ return max([handler.revoked_count for handler in self.event_handlers.itervalues()
+ if handler.idx <= keep_alive])
+
def joined_nodes(self):
with self.lock:
return [handler.node for handler in self.event_handlers.itervalues()
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index a7ec9c8..2abc4f1 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -33,9 +33,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
})
- def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ def rolling_bounce_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_shutdown=True):
for _ in range(num_bounces):
- for node in consumer.nodes:
+ for node in consumer.nodes[keep_alive:]:
consumer.stop_node(node, clean_shutdown)
wait_until(lambda: len(consumer.dead_nodes()) == 1,
@@ -47,15 +47,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.await_all_members(consumer)
self.await_consumed_messages(consumer)
- def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
+ def bounce_all_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_shutdown=True):
for _ in range(num_bounces):
- for node in consumer.nodes:
+ for node in consumer.nodes[keep_alive:]:
consumer.stop_node(node, clean_shutdown)
- wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
+ wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers - keep_alive, timeout_sec=10,
err_msg="Timed out waiting for the consumers to shutdown")
- for node in consumer.nodes:
+ for node in consumer.nodes[keep_alive:]:
consumer.start_node(node)
self.await_all_members(consumer)
@@ -145,7 +145,69 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
else:
self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
-
+
+ consumer.stop_all()
+ if clean_shutdown:
+ # if the total records consumed matches the current position, we haven't seen any duplicates
+ # this can only be guaranteed with a clean shutdown
+ assert consumer.current_position(partition) == consumer.total_consumed(), \
+ "Total consumed records %d did not match consumed position %d" % \
+ (consumer.total_consumed(), consumer.current_position(partition))
+ else:
+ # we may have duplicates in a hard failure
+ assert consumer.current_position(partition) <= consumer.total_consumed(), \
+ "Current position %d greater than the total number of consumed records %d" % \
+ (consumer.current_position(partition), consumer.total_consumed())
+
+ @cluster(num_nodes=7)
+ @matrix(clean_shutdown=[True], static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5])
+ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_mode, num_bounces):
+ """
+ Verify correct static consumer behavior when the consumers in the group are restarted. In order to make
+ sure the behavior of static members are different from dynamic ones, we take both static and dynamic
+ membership into this test suite.
+
+ Setup: single Kafka cluster with one producer and a set of consumers in one group.
+
+ - Start a producer which continues producing new messages throughout the test.
+ - Start up the consumers as static/dynamic members and wait until they've joined the group.
+ - In a loop, restart each consumer except the first member (note: may not be the leader), and expect no rebalance triggered
+ during this process if the group is in static membership.
+ """
+ partition = TopicPartition(self.TOPIC, 0)
+
+ producer = self.setup_producer(self.TOPIC)
+
+ producer.start()
+ self.await_produced_messages(producer)
+
+ self.session_timeout_sec = 60
+ consumer = self.setup_consumer(self.TOPIC, static_membership=static_membership)
+
+ consumer.start()
+ self.await_all_members(consumer)
+
+ num_revokes_before_bounce = consumer.num_revokes_for_alive()
+
+ num_keep_alive = 1
+
+ if bounce_mode == "all":
+ self.bounce_all_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces)
+ else:
+ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces)
+
+ num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce
+
+ check_condition = num_revokes_after_bounce != 0
+ # under static membership, the live consumer shall not revoke any current running partitions,
+ # since there is no global rebalance being triggered.
+ if static_membership:
+ check_condition = num_revokes_after_bounce == 0
+
+ assert check_condition, \
+ "Total revoked count %d does not match the expectation of having 0 revokes as %d" % \
+ (num_revokes_after_bounce, check_condition)
+
consumer.stop_all()
if clean_shutdown:
# if the total records consumed matches the current position, we haven't seen any duplicates
diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py
index 539a0f3..071439d 100644
--- a/tests/kafkatest/tests/verifiable_consumer_test.py
+++ b/tests/kafkatest/tests/verifiable_consumer_test.py
@@ -53,10 +53,10 @@ class VerifiableConsumerTest(KafkaTest):
"""Override this since we're adding services outside of the constructor"""
return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers
- def setup_consumer(self, topic, enable_autocommit=False,
+ def setup_consumer(self, topic, static_membership=False, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", **kwargs):
return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
- topic, self.group_id, session_timeout_sec=self.session_timeout_sec,
+ topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec,
assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit,
log_level="TRACE", **kwargs)
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 43d30d0..fe41a21 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -524,6 +524,14 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
.dest("groupId")
.help("The groupId shared among members of the consumer group");
+ parser.addArgument("--group-instance-id")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .metavar("GROUP_INSTANCE_ID")
+ .dest("groupInstanceId")
+ .help("A unique identifier of the consumer instance");
+
parser.addArgument("--max-messages")
.action(store())
.required(false)
@@ -600,6 +608,11 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
}
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
+
+ String groupInstanceId = res.getString("groupInstanceId");
+ if (!groupInstanceId.equals("None")) {
+ consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+ }
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));