You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/03/18 20:26:23 UTC
[kafka] branch trunk updated: KAFKA-7858: Automatically generate
JoinGroup request/response
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8406f36 KAFKA-7858: Automatically generate JoinGroup request/response
8406f36 is described below
commit 8406f3624d8f99b614eb7171b71fae8b0e663dcb
Author: Boyang Chen <bc...@outlook.com>
AuthorDate: Mon Mar 18 13:26:09 2019 -0700
KAFKA-7858: Automatically generate JoinGroup request/response
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../consumer/internals/AbstractCoordinator.java | 34 +--
.../consumer/internals/ConsumerCoordinator.java | 23 +-
.../org/apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../kafka/common/requests/JoinGroupRequest.java | 252 ++++-----------------
.../kafka/common/requests/JoinGroupResponse.java | 197 ++--------------
.../common/message/JoinGroupResponse.json | 2 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 37 ++-
.../internals/AbstractCoordinatorTest.java | 32 ++-
.../internals/ConsumerCoordinatorTest.java | 79 +++----
.../kafka/common/requests/RequestResponseTest.java | 56 ++++-
.../runtime/distributed/WorkerCoordinator.java | 18 +-
.../runtime/distributed/WorkerCoordinatorTest.java | 85 +++++--
core/src/main/scala/kafka/server/KafkaApis.scala | 63 ++++--
.../kafka/api/AuthorizerIntegrationTest.scala | 20 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 20 +-
16 files changed, 393 insertions(+), 533 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index fd7431b..4ff4e19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -44,7 +46,6 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -86,7 +87,7 @@ import java.util.concurrent.atomic.AtomicReference;
*
* To leverage this protocol, an implementation must define the format of metadata provided by each
* member for group registration in {@link #metadata()} and the format of the state assignment provided
- * by the leader in {@link #performAssignment(String, String, Map)} and becomes available to members in
+ * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in
* {@link #onJoinComplete(int, String, String, ByteBuffer)}.
*
* Note on locking: this class shares state between the caller and a background thread which is
@@ -183,7 +184,7 @@ public abstract class AbstractCoordinator implements Closeable {
* on the preference).
* @return Non-empty map of supported protocols and metadata
*/
- protected abstract List<ProtocolMetadata> metadata();
+ protected abstract JoinGroupRequestData.JoinGroupRequestProtocolSet metadata();
/**
* Invoked prior to each group join or rejoin. This is typically used to perform any
@@ -202,7 +203,7 @@ public abstract class AbstractCoordinator implements Closeable {
*/
protected abstract Map<String, ByteBuffer> performAssignment(String leaderId,
String protocol,
- Map<String, ByteBuffer> allMemberMetadata);
+ List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);
/**
* Invoked when a group member has successfully joined a group. If this call fails with an exception,
@@ -476,7 +477,7 @@ public abstract class AbstractCoordinator implements Closeable {
/**
* Join the group and return the assignment for the next generation. This function handles both
- * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
+ * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, List)} if
* elected leader by the coordinator.
*
* NOTE: This is visible only for testing
@@ -490,11 +491,14 @@ public abstract class AbstractCoordinator implements Closeable {
// send a join group request to the coordinator
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
- groupId,
- this.sessionTimeoutMs,
- this.generation.memberId,
- protocolType(),
- metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
+ new JoinGroupRequestData()
+ .setGroupId(groupId)
+ .setSessionTimeoutMs(this.sessionTimeoutMs)
+ .setMemberId(this.generation.memberId)
+ .setProtocolType(protocolType())
+ .setProtocols(metadata())
+ .setRebalanceTimeoutMs(this.rebalanceTimeoutMs)
+ );
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
@@ -520,8 +524,8 @@ public abstract class AbstractCoordinator implements Closeable {
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
- AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
- joinResponse.memberId(), joinResponse.groupProtocol());
+ AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
+ joinResponse.data().memberId(), joinResponse.data().protocolName());
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
@@ -562,7 +566,7 @@ public abstract class AbstractCoordinator implements Closeable {
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
- joinResponse.memberId(), null);
+ joinResponse.data().memberId(), null);
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
@@ -587,8 +591,8 @@ public abstract class AbstractCoordinator implements Closeable {
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
- Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
- joinResponse.members());
+ Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
+ joinResponse.data().members());
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7990707..2b949a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -40,7 +42,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -166,16 +167,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
@Override
- protected List<ProtocolMetadata> metadata() {
+ protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
log.debug("Joining group with current subscription: {}", subscriptions.subscription());
this.joinedSubscription = subscriptions.subscription();
- List<ProtocolMetadata> metadataList = new ArrayList<>();
+ JoinGroupRequestData.JoinGroupRequestProtocolSet protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolSet();
+
for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(joinedSubscription);
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
- metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
+
+ protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName(assignor.name())
+ .setMetadata(metadata.array()));
}
- return metadataList;
+ return protocolSet;
}
public void updatePatternSubscription(Cluster cluster) {
@@ -385,16 +390,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
- Map<String, ByteBuffer> allSubscriptions) {
+ List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
- for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
- Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
- subscriptions.put(subscriptionEntry.getKey(), subscription);
+ for (JoinGroupResponseData.JoinGroupResponseMember memberSubScription : allSubscriptions) {
+ Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubScription.metadata()));
+ subscriptions.put(memberSubScription.memberId(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index c23aa7e..3f8d80d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
@@ -81,8 +83,6 @@ import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.LeaderAndIsrResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
@@ -135,7 +135,7 @@ public enum ApiKeys {
OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(),
FindCoordinatorResponse.schemaVersions()),
- JOIN_GROUP(11, "JoinGroup", JoinGroupRequest.schemaVersions(), JoinGroupResponse.schemaVersions()),
+ JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 1d3fd77..eadd302 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -85,7 +85,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case FIND_COORDINATOR:
return new FindCoordinatorResponse(struct);
case JOIN_GROUP:
- return new JoinGroupResponse(struct);
+ return new JoinGroupResponse(struct, version);
case HEARTBEAT:
return new HeartbeatResponse(struct);
case LEAVE_GROUP:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 236d684..2f46172 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -16,188 +16,56 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
public class JoinGroupRequest extends AbstractRequest {
- private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
- private static final String REBALANCE_TIMEOUT_KEY_NAME = "rebalance_timeout";
- private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
- private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols";
- private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name";
- private static final String PROTOCOL_METADATA_KEY_NAME = "protocol_metadata";
-
- /* Join group api */
- private static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(
- new Field(PROTOCOL_NAME_KEY_NAME, STRING),
- new Field(PROTOCOL_METADATA_KEY_NAME, BYTES));
-
- private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(
- GROUP_ID,
- new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives " +
- "no heartbeat after this timeout in ms."),
- MEMBER_ID,
- new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"),
- new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
- "that the member supports"));
-
- private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(
- GROUP_ID,
- new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives no " +
- "heartbeat after this timeout in ms."),
- new Field(REBALANCE_TIMEOUT_KEY_NAME, INT32, "The maximum time that the coordinator will wait for each " +
- "member to rejoin when rebalancing the group"),
- MEMBER_ID,
- new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"),
- new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
- "that the member supports"));
-
- /* v2 request is the same as v1. Throttle time has been added to response */
- private static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema JOIN_GROUP_REQUEST_V3 = JOIN_GROUP_REQUEST_V2;
-
- /**
- * The version number is bumped to indicate that client needs to issue a second join group request under first try
- * with UNKNOWN_MEMBER_ID.
- */
- private static final Schema JOIN_GROUP_REQUEST_V4 = JOIN_GROUP_REQUEST_V3;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2,
- JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
- }
-
- public static final String UNKNOWN_MEMBER_ID = "";
-
- private final String groupId;
- private final int sessionTimeout;
- private final int rebalanceTimeout;
- private final String memberId;
- private final String protocolType;
- private final List<ProtocolMetadata> groupProtocols;
-
- public static class ProtocolMetadata {
- private final String name;
- private final ByteBuffer metadata;
-
- public ProtocolMetadata(String name, ByteBuffer metadata) {
- this.name = name;
- this.metadata = metadata;
- }
-
- public String name() {
- return name;
- }
-
- public ByteBuffer metadata() {
- return metadata;
- }
- }
public static class Builder extends AbstractRequest.Builder<JoinGroupRequest> {
- private final String groupId;
- private final int sessionTimeout;
- private final String memberId;
- private final String protocolType;
- private final List<ProtocolMetadata> groupProtocols;
- private int rebalanceTimeout = 0;
- public Builder(String groupId, int sessionTimeout, String memberId,
- String protocolType, List<ProtocolMetadata> groupProtocols) {
- super(ApiKeys.JOIN_GROUP);
- this.groupId = groupId;
- this.sessionTimeout = sessionTimeout;
- this.rebalanceTimeout = sessionTimeout;
- this.memberId = memberId;
- this.protocolType = protocolType;
- this.groupProtocols = groupProtocols;
- }
+ private final JoinGroupRequestData data;
- public Builder setRebalanceTimeout(int rebalanceTimeout) {
- this.rebalanceTimeout = rebalanceTimeout;
- return this;
+ public Builder(JoinGroupRequestData data) {
+ super(ApiKeys.JOIN_GROUP);
+ this.data = data;
}
@Override
public JoinGroupRequest build(short version) {
- if (version < 1) {
- // v0 had no rebalance timeout but used session timeout implicitly
- rebalanceTimeout = sessionTimeout;
- }
- return new JoinGroupRequest(version, groupId, sessionTimeout,
- rebalanceTimeout, memberId, protocolType, groupProtocols);
+ return new JoinGroupRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type: JoinGroupRequest").
- append(", groupId=").append(groupId).
- append(", sessionTimeout=").append(sessionTimeout).
- append(", rebalanceTimeout=").append(rebalanceTimeout).
- append(", memberId=").append(memberId).
- append(", protocolType=").append(protocolType).
- append(", groupProtocols=").append(Utils.join(groupProtocols, ", ")).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
- private JoinGroupRequest(short version, String groupId, int sessionTimeout,
- int rebalanceTimeout, String memberId, String protocolType,
- List<ProtocolMetadata> groupProtocols) {
- super(ApiKeys.JOIN_GROUP, version);
- this.groupId = groupId;
- this.sessionTimeout = sessionTimeout;
- this.rebalanceTimeout = rebalanceTimeout;
- this.memberId = memberId;
- this.protocolType = protocolType;
- this.groupProtocols = groupProtocols;
- }
-
- public JoinGroupRequest(Struct struct, short versionId) {
- super(ApiKeys.JOIN_GROUP, versionId);
+ private final JoinGroupRequestData data;
+ private final short version;
- groupId = struct.get(GROUP_ID);
- sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
+ public static final String UNKNOWN_MEMBER_ID = "";
- if (struct.hasField(REBALANCE_TIMEOUT_KEY_NAME))
- // rebalance timeout is added in v1
- rebalanceTimeout = struct.getInt(REBALANCE_TIMEOUT_KEY_NAME);
- else
- // v0 had no rebalance timeout but used session timeout implicitly
- rebalanceTimeout = sessionTimeout;
+ public JoinGroupRequest(JoinGroupRequestData data, short version) {
+ super(ApiKeys.JOIN_GROUP, version);
+ this.data = data;
+ this.version = version;
+ }
- memberId = struct.get(MEMBER_ID);
- protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME);
+ public JoinGroupRequest(Struct struct, short version) {
+ super(ApiKeys.JOIN_GROUP, version);
+ this.data = new JoinGroupRequestData(struct, version);
+ this.version = version;
+ }
- groupProtocols = new ArrayList<>();
- for (Object groupProtocolObj : struct.getArray(GROUP_PROTOCOLS_KEY_NAME)) {
- Struct groupProtocolStruct = (Struct) groupProtocolObj;
- String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
- ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
- groupProtocols.add(new ProtocolMetadata(name, metadata));
- }
+ public JoinGroupRequestData data() {
+ return data;
}
@Override
@@ -207,77 +75,39 @@ public class JoinGroupRequest extends AbstractRequest {
case 0:
case 1:
return new JoinGroupResponse(
- Errors.forException(e),
- JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_PROTOCOL,
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
- Collections.emptyMap());
+ new JoinGroupResponseData()
+ .setErrorCode(Errors.forException(e).code())
+ .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
+ .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
+ .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+ .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+ .setMembers(Collections.emptyList())
+ );
case 2:
case 3:
case 4:
return new JoinGroupResponse(
- throttleTimeMs,
- Errors.forException(e),
- JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_PROTOCOL,
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
- Collections.emptyMap());
-
+ new JoinGroupResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(Errors.forException(e).code())
+ .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
+ .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
+ .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+ .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+ .setMembers(Collections.emptyList())
+ );
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.JOIN_GROUP.latestVersion()));
}
}
- public String groupId() {
- return groupId;
- }
-
- public int sessionTimeout() {
- return sessionTimeout;
- }
-
- public int rebalanceTimeout() {
- return rebalanceTimeout;
- }
-
- public String memberId() {
- return memberId;
- }
-
- public List<ProtocolMetadata> groupProtocols() {
- return groupProtocols;
- }
-
- public String protocolType() {
- return protocolType;
- }
-
public static JoinGroupRequest parse(ByteBuffer buffer, short version) {
return new JoinGroupRequest(ApiKeys.JOIN_GROUP.parseRequest(version, buffer), version);
}
@Override
protected Struct toStruct() {
- short version = version();
- Struct struct = new Struct(ApiKeys.JOIN_GROUP.requestSchema(version));
- struct.set(GROUP_ID, groupId);
- struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
- if (version >= 1) {
- struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout);
- }
- struct.set(MEMBER_ID, memberId);
- struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
- List<Struct> groupProtocolsList = new ArrayList<>(groupProtocols.size());
- for (ProtocolMetadata protocol : groupProtocols) {
- Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
- protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
- protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
- groupProtocolsList.add(protocolStruct);
- }
- struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
- return struct;
+ return data.toStruct(version);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 55cb97f..0e1644e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -16,217 +16,70 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
public class JoinGroupResponse extends AbstractResponse {
- private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol";
- private static final String LEADER_ID_KEY_NAME = "leader_id";
- private static final String MEMBERS_KEY_NAME = "members";
-
- private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
-
- private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(
- MEMBER_ID,
- new Field(MEMBER_METADATA_KEY_NAME, BYTES));
-
- private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(
- ERROR_CODE,
- GENERATION_ID,
- new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"),
- new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
- MEMBER_ID,
- new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
- private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
-
- private static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
- THROTTLE_TIME_MS,
- ERROR_CODE,
- GENERATION_ID,
- new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"),
- new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
- MEMBER_ID,
- new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema JOIN_GROUP_RESPONSE_V3 = JOIN_GROUP_RESPONSE_V2;
-
- /**
- * The version number is bumped to indicate that client needs to issue a second join group request under first try
- * with UNKNOWN_MEMBER_ID.
- */
- private static final Schema JOIN_GROUP_RESPONSE_V4 = JOIN_GROUP_RESPONSE_V3;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2,
- JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
- }
+ private final JoinGroupResponseData data;
public static final String UNKNOWN_PROTOCOL = "";
public static final int UNKNOWN_GENERATION_ID = -1;
public static final String UNKNOWN_MEMBER_ID = "";
- /**
- * Possible error codes:
- *
- * COORDINATOR_LOAD_IN_PROGRESS (14)
- * GROUP_COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR (16)
- * INCONSISTENT_GROUP_PROTOCOL (23)
- * UNKNOWN_MEMBER_ID (25)
- * INVALID_SESSION_TIMEOUT (26)
- * GROUP_AUTHORIZATION_FAILED (30)
- * MEMBER_ID_REQUIRED (79)
- */
-
- private final int throttleTimeMs;
- private final Errors error;
- private final int generationId;
- private final String groupProtocol;
- private final String memberId;
- private final String leaderId;
- private final Map<String, ByteBuffer> members;
-
- public JoinGroupResponse(Errors error,
- int generationId,
- String groupProtocol,
- String memberId,
- String leaderId,
- Map<String, ByteBuffer> groupMembers) {
- this(DEFAULT_THROTTLE_TIME, error, generationId, groupProtocol, memberId, leaderId, groupMembers);
- }
-
- public JoinGroupResponse(int throttleTimeMs,
- Errors error,
- int generationId,
- String groupProtocol,
- String memberId,
- String leaderId,
- Map<String, ByteBuffer> groupMembers) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
- this.generationId = generationId;
- this.groupProtocol = groupProtocol;
- this.memberId = memberId;
- this.leaderId = leaderId;
- this.members = groupMembers;
+ public JoinGroupResponse(JoinGroupResponseData data) {
+ this.data = data;
}
public JoinGroupResponse(Struct struct) {
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- members = new HashMap<>();
-
- for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
- Struct memberData = (Struct) memberDataObj;
- String memberId = memberData.get(MEMBER_ID);
- ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME);
- members.put(memberId, memberMetadata);
- }
- error = Errors.forCode(struct.get(ERROR_CODE));
- generationId = struct.get(GENERATION_ID);
- groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME);
- memberId = struct.get(MEMBER_ID);
- leaderId = struct.getString(LEADER_ID_KEY_NAME);
- }
-
- @Override
- public int throttleTimeMs() {
- return throttleTimeMs;
- }
-
- public Errors error() {
- return error;
+ short latestVersion = (short) (JoinGroupResponseData.SCHEMAS.length - 1);
+ this.data = new JoinGroupResponseData(struct, latestVersion);
}
- @Override
- public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
- }
-
- public int generationId() {
- return generationId;
+ public JoinGroupResponse(Struct struct, short version) {
+ this.data = new JoinGroupResponseData(struct, version);
}
- public String groupProtocol() {
- return groupProtocol;
+ public JoinGroupResponseData data() {
+ return data;
}
- public String memberId() {
- return memberId;
+ public boolean isLeader() {
+ return data.memberId().equals(data.leader());
}
- public String leaderId() {
- return leaderId;
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
}
- public boolean isLeader() {
- return memberId.equals(leaderId);
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
}
- public Map<String, ByteBuffer> members() {
- return members;
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}
- public static JoinGroupResponse parse(ByteBuffer buffer, short version) {
- return new JoinGroupResponse(ApiKeys.JOIN_GROUP.parseResponse(version, buffer));
+ public static JoinGroupResponse parse(ByteBuffer buffer, short versionId) {
+ return new JoinGroupResponse(ApiKeys.JOIN_GROUP.parseResponse(versionId, buffer), versionId);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.JOIN_GROUP.responseSchema(version));
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
- struct.set(ERROR_CODE, error.code());
- struct.set(GENERATION_ID, generationId);
- struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
- struct.set(MEMBER_ID, memberId);
- struct.set(LEADER_ID_KEY_NAME, leaderId);
-
- List<Struct> memberArray = new ArrayList<>();
- for (Map.Entry<String, ByteBuffer> entries : members.entrySet()) {
- Struct memberData = struct.instance(MEMBERS_KEY_NAME);
- memberData.set(MEMBER_ID, entries.getKey());
- memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue());
- memberArray.add(memberData);
- }
- struct.set(MEMBERS_KEY_NAME, memberArray.toArray());
-
- return struct;
+ return data.toStruct(version);
}
@Override
public String toString() {
- return "JoinGroupResponse" +
- "(throttleTimeMs=" + throttleTimeMs +
- ", error=" + error +
- ", generationId=" + generationId +
- ", groupProtocol=" + groupProtocol +
- ", memberId=" + memberId +
- ", leaderId=" + leaderId +
- ", members=" + ((members == null) ? "null" :
- Utils.join(members.keySet(), ",")) + ")";
+ return data.toString();
}
@Override
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json b/clients/src/main/resources/common/message/JoinGroupResponse.json
index fb36bb0..a1c6a70 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -28,7 +28,7 @@
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
- { "name": "GenerationId", "type": "int32", "versions": "0+",
+ { "name": "GenerationId", "type": "int32", "versions": "0+", "default": "-1",
"about": "The generation ID of the group." },
{ "name": "ProtocolName", "type": "string", "versions": "0+",
"about": "The group protocol selected by the coordinator." },
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index cd0a76f..e17a6ef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -39,6 +39,8 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
@@ -89,6 +91,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -1420,8 +1423,20 @@ public class KafkaConsumerTest {
final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic)));
// This member becomes the leader
- final JoinGroupResponse leaderResponse = new JoinGroupResponse(Errors.NONE, 1, assignor.name(), "memberId", "memberId",
- Collections.singletonMap("memberId", byteBuffer));
+ String memberId = "memberId";
+ final JoinGroupResponse leaderResponse = new JoinGroupResponse(
+ new JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(1).setProtocolName(assignor.name())
+ .setLeader(memberId).setMemberId(memberId)
+ .setMembers(Collections.singletonList(
+ new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId(memberId)
+ .setMetadata(byteBuffer.array())
+ )
+ )
+ );
+
client.prepareResponseFrom(leaderResponse, coordinator);
// sync group fails due to disconnect
@@ -1635,7 +1650,12 @@ public class KafkaConsumerTest {
@Override
public boolean matches(AbstractRequest body) {
JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
- PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata());
+ Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator =
+ joinGroupRequest.data().protocols().iterator();
+ assertTrue(protocolIterator.hasNext());
+
+ ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
return subscribedTopics.equals(new HashSet<>(subscription.topics()));
}
}, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
@@ -1707,8 +1727,15 @@ public class KafkaConsumerTest {
}
private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
- return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
- Collections.emptyMap());
+ return new JoinGroupResponse(
+ new JoinGroupResponseData()
+ .setErrorCode(error.code())
+ .setGenerationId(generationId)
+ .setProtocolName(assignor.name())
+ .setLeader(leaderId)
+ .setMemberId(memberId)
+ .setMembers(Collections.emptyList())
+ );
}
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 9a68db7..dc8ea46 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -215,7 +217,7 @@ public class AbstractCoordinatorTest {
return false;
}
JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
- if (!joinGroupRequest.memberId().equals(memberId)) {
+ if (!joinGroupRequest.data().memberId().equals(memberId)) {
return false;
}
return true;
@@ -691,8 +693,15 @@ public class AbstractCoordinatorTest {
}
private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
- return new JoinGroupResponse(error, generationId, "dummy-subprotocol", memberId, leaderId,
- Collections.<String, ByteBuffer>emptyMap());
+ return new JoinGroupResponse(
+ new JoinGroupResponseData()
+ .setErrorCode(error.code())
+ .setGenerationId(generationId)
+ .setProtocolName("dummy-subprotocol")
+ .setMemberId(memberId)
+ .setLeader(leaderId)
+ .setMembers(Collections.emptyList())
+ );
}
private JoinGroupResponse joinGroupResponse(Errors error) {
@@ -725,15 +734,22 @@ public class AbstractCoordinatorTest {
}
@Override
- protected List<JoinGroupRequest.ProtocolMetadata> metadata() {
- return Collections.singletonList(new JoinGroupRequest.ProtocolMetadata("dummy-subprotocol", EMPTY_DATA));
+ protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
+ return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+ Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("dummy-subprotocol")
+ .setMetadata(EMPTY_DATA.array())).iterator()
+ );
}
@Override
- protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+ protected Map<String, ByteBuffer> performAssignment(String leaderId,
+ String protocol,
+ List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
Map<String, ByteBuffer> assignment = new HashMap<>();
- for (Map.Entry<String, ByteBuffer> metadata : allMemberMetadata.entrySet())
- assignment.put(metadata.getKey(), EMPTY_DATA);
+ for (JoinGroupResponseData.JoinGroupResponseMember member : allMemberMetadata) {
+ assignment.put(member.memberId(), EMPTY_DATA);
+ }
return assignment;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b079963..49bbcec 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -23,9 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
-import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -39,6 +37,8 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -46,7 +46,6 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -71,6 +70,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -584,9 +584,14 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
JoinGroupRequest join = (JoinGroupRequest) body;
- ProtocolMetadata protocolMetadata = join.groupProtocols().iterator().next();
- PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata());
- protocolMetadata.metadata().rewind();
+ Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator =
+ join.data().protocols().iterator();
+ assertTrue(protocolIterator.hasNext());
+ JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
+
+ ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
+ PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
+ metadata.rewind();
return subscription.topics().containsAll(updatedSubscriptionSet);
}
}, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
@@ -895,7 +900,7 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
JoinGroupRequest joinRequest = (JoinGroupRequest) body;
- return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+ return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -947,7 +952,7 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
JoinGroupRequest joinRequest = (JoinGroupRequest) body;
- return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+ return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -1479,7 +1484,8 @@ public class ConsumerCoordinatorTest {
joinAsFollowerAndReceiveAssignment("consumer", coordinator, singletonList(t1p));
// now switch to manual assignment
- client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
+ client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
+ .setErrorCode(Errors.NONE.code())));
subscriptions.unsubscribe();
coordinator.maybeLeaveGroup();
subscriptions.assignFromUser(singleton(t1p));
@@ -1851,30 +1857,6 @@ public class ConsumerCoordinatorTest {
}
@Test
- public void testProtocolMetadataOrder() {
- RoundRobinAssignor roundRobin = new RoundRobinAssignor();
- RangeAssignor range = new RangeAssignor();
-
- try (Metrics metrics = new Metrics(time)) {
- ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
- false, true);
- List<ProtocolMetadata> metadata = coordinator.metadata();
- assertEquals(2, metadata.size());
- assertEquals(roundRobin.name(), metadata.get(0).name());
- assertEquals(range.name(), metadata.get(1).name());
- }
-
- try (Metrics metrics = new Metrics(time)) {
- ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
- false, true);
- List<ProtocolMetadata> metadata = coordinator.metadata();
- assertEquals(2, metadata.size());
- assertEquals(range.name(), metadata.get(0).name());
- assertEquals(roundRobin.name(), metadata.get(1).name());
- }
- }
-
- @Test
public void testThreadSafeAssignedPartitionsMetric() throws Exception {
// Get the assigned-partitions metric
final Metric metric = metrics.metric(new MetricName("assigned-partitions", "consumer" + groupId + "-coordinator-metrics",
@@ -2131,7 +2113,8 @@ public class ConsumerCoordinatorTest {
LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
return leaveRequest.data().groupId().equals(groupId);
}
- }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
+ }, new LeaveGroupResponse(new LeaveGroupResponseData()
+ .setErrorCode(Errors.NONE.code())));
coordinator.close();
assertTrue("Commit not requested", commitRequested.get());
@@ -2174,18 +2157,36 @@ public class ConsumerCoordinatorTest {
String memberId,
Map<String, List<String>> subscriptions,
Errors error) {
- Map<String, ByteBuffer> metadata = new HashMap<>();
+ List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
- metadata.put(subscriptionEntry.getKey(), buf);
+ metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId(subscriptionEntry.getKey())
+ .setMetadata(buf.array()));
}
- return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata);
+
+ return new JoinGroupResponse(
+ new JoinGroupResponseData()
+ .setErrorCode(error.code())
+ .setGenerationId(generationId)
+ .setProtocolName(partitionAssignor.name())
+ .setLeader(memberId)
+ .setMemberId(memberId)
+ .setMembers(Collections.emptyList())
+ );
}
private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
- return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
- Collections.emptyMap());
+ return new JoinGroupResponse(
+ new JoinGroupResponseData()
+ .setErrorCode(error.code())
+ .setGenerationId(generationId)
+ .setProtocolName(partitionAssignor.name())
+ .setLeader(leaderId)
+ .setMemberId(memberId)
+ .setMembers(Collections.emptyList())
+ );
}
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a483500..5690743 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -45,6 +45,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@@ -648,7 +650,7 @@ public class RequestResponseTest {
final short version = 0;
JoinGroupRequest jgr = createJoinGroupRequest(version);
JoinGroupRequest jgr2 = new JoinGroupRequest(jgr.toStruct(), version);
- assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout());
+ assertEquals(jgr2.data().rebalanceTimeoutMs(), jgr.data().rebalanceTimeoutMs());
}
@Test
@@ -742,23 +744,53 @@ public class RequestResponseTest {
}
private JoinGroupRequest createJoinGroupRequest(int version) {
- ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
- List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
- protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
+ JoinGroupRequestData.JoinGroupRequestProtocolSet protocols = new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+ Collections.singleton(
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("consumer-range")
+ .setMetadata(new byte[0])).iterator()
+ );
if (version == 0) {
- return new JoinGroupRequest.Builder("group1", 30000, "consumer1", "consumer", protocols).
- build((short) version);
+ return new JoinGroupRequest.Builder(
+ new JoinGroupRequestData()
+ .setGroupId("group1")
+ .setSessionTimeoutMs(30000)
+ .setMemberId("consumer1")
+ .setProtocolType("consumer")
+ .setProtocols(protocols))
+ .build((short) version);
} else {
- return new JoinGroupRequest.Builder("group1", 10000, "consumer1", "consumer", protocols).
- setRebalanceTimeout(60000).build();
+ return new JoinGroupRequest.Builder(
+ new JoinGroupRequestData()
+ .setGroupId("group1")
+ .setSessionTimeoutMs(30000)
+ .setMemberId("consumer1")
+ .setProtocolType("consumer")
+ .setProtocols(protocols)
+ .setRebalanceTimeoutMs(60000)) // v1 and above contains rebalance timeout
+ .build((short) version);
}
}
private JoinGroupResponse createJoinGroupResponse() {
- Map<String, ByteBuffer> members = new HashMap<>();
- members.put("consumer1", ByteBuffer.wrap(new byte[]{}));
- members.put("consumer2", ByteBuffer.wrap(new byte[]{}));
- return new JoinGroupResponse(Errors.NONE, 1, "range", "consumer1", "leader", members);
+ List<JoinGroupResponseData.JoinGroupResponseMember> members = Arrays.asList(
+ new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("consumer1")
+ .setMetadata(new byte[0]),
+ new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("consumer2")
+ .setMetadata(new byte[0])
+ );
+
+ return new JoinGroupResponse(
+ new JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(1)
+ .setProtocolName("range")
+ .setLeader("leader")
+ .setMemberId("consumer1")
+ .setMembers(members)
+ );
}
private ListGroupsRequest createListGroupsRequest() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 103a323..fa89a9d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -18,11 +18,12 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -144,11 +145,16 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
@Override
- public List<ProtocolMetadata> metadata() {
+ public JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
configSnapshot = configStorage.snapshot();
ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
- return Collections.singletonList(new ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata));
+ return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+ Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName(DEFAULT_SUBPROTOCOL)
+ .setMetadata(metadata.array()))
+ .iterator()
+ );
}
@Override
@@ -163,12 +169,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
@Override
- protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+ protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
log.debug("Performing task assignment");
Map<String, ConnectProtocol.WorkerState> memberConfigs = new HashMap<>();
- for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet())
- memberConfigs.put(entry.getKey(), ConnectProtocol.deserializeMetadata(entry.getValue()));
+ for (JoinGroupResponseData.JoinGroupResponseMember memberMetadata : allMemberMetadata)
+ memberConfigs.put(memberMetadata.memberId(), ConnectProtocol.deserializeMetadata(ByteBuffer.wrap(memberMetadata.metadata())));
long maxOffset = findMaxMemberConfigOffset(memberConfigs);
Long leaderOffset = ensureLeaderConfig(maxOffset);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 68d236f..c78c7a4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -21,11 +21,12 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -44,15 +45,18 @@ import org.powermock.api.easymock.PowerMock;
import org.powermock.reflect.Whitebox;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class WorkerCoordinatorTest {
@@ -186,12 +190,15 @@ public class WorkerCoordinatorTest {
PowerMock.replayAll();
- List<ProtocolMetadata> serialized = coordinator.metadata();
+ JoinGroupRequestData.JoinGroupRequestProtocolSet serialized = coordinator.metadata();
assertEquals(1, serialized.size());
- ProtocolMetadata defaultMetadata = serialized.get(0);
+ Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator = serialized.iterator();
+ assertTrue(protocolIterator.hasNext());
+ JoinGroupRequestData.JoinGroupRequestProtocol defaultMetadata = protocolIterator.next();
assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name());
- ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(defaultMetadata.metadata());
+ ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(
+ ByteBuffer.wrap(defaultMetadata.metadata()));
assertEquals(1, state.offset());
PowerMock.verifyAll();
@@ -364,11 +371,17 @@ public class WorkerCoordinatorTest {
// Prime the current configuration state
coordinator.metadata();
- Map<String, ByteBuffer> configs = new HashMap<>();
// Mark everyone as in sync with configState1
- configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
- configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
- Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+ List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
+ responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("leader")
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+ );
+ responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("member")
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+ );
+ Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
// configState1 has 1 connector, 1 task
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
@@ -400,11 +413,18 @@ public class WorkerCoordinatorTest {
// Prime the current configuration state
coordinator.metadata();
- Map<String, ByteBuffer> configs = new HashMap<>();
// Mark everyone as in sync with configState1
- configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
- configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
- Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+ List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
+ responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("leader")
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+ );
+ responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("member")
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+ );
+
+ Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
// configState2 has 2 connector, 3 tasks and should trigger round robin assignment
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
@@ -436,11 +456,18 @@ public class WorkerCoordinatorTest {
// Prime the current configuration state
coordinator.metadata();
- Map<String, ByteBuffer> configs = new HashMap<>();
// Mark everyone as in sync with configState1
- configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
- configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
- Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+ List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
+ responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("leader")
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+ );
+ responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId("member")
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+ );
+
+ Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
// Round robin assignment when there are the same number of connectors and tasks should result in each being
// evenly distributed across the workers, i.e. round robin assignment of connectors first, then followed by tasks
@@ -468,20 +495,36 @@ public class WorkerCoordinatorTest {
private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
Map<String, Long> configOffsets, Errors error) {
- Map<String, ByteBuffer> metadata = new HashMap<>();
+ List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
// We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID
String memberUrl = configStateEntry.getKey();
long configOffset = configStateEntry.getValue();
ByteBuffer buf = ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(memberUrl, configOffset));
- metadata.put(configStateEntry.getKey(), buf);
+ metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId(configStateEntry.getKey())
+ .setMetadata(buf.array())
+ );
}
- return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata);
+ return new JoinGroupResponse(
+ new JoinGroupResponseData().setErrorCode(error.code())
+ .setGenerationId(generationId)
+ .setProtocolName(WorkerCoordinator.DEFAULT_SUBPROTOCOL)
+ .setLeader(memberId)
+ .setMemberId(memberId)
+ .setMembers(metadata)
+ );
}
private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
- return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
- Collections.<String, ByteBuffer>emptyMap());
+ return new JoinGroupResponse(
+ new JoinGroupResponseData().setErrorCode(error.code())
+ .setGenerationId(generationId)
+ .setProtocolName(WorkerCoordinator.DEFAULT_SUBPROTOCOL)
+ .setLeader(leaderId)
+ .setMemberId(memberId)
+ .setMembers(Collections.emptyList())
+ );
}
private SyncGroupResponse syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0b73341..68d0823 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,9 +45,11 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.{CreateTopicsResponseData, DescribeGroupsResponseData}
+import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
+import org.apache.kafka.common.message.DescribeGroupsResponseData
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
+import org.apache.kafka.common.message.JoinGroupResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData
import org.apache.kafka.common.message.SaslAuthenticateResponseData
import org.apache.kafka.common.message.SaslHandshakeResponseData
@@ -1282,10 +1284,23 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
- val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
+ val members = joinResult.members map { case (memberId, metadataArray) =>
+ new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId(memberId)
+ .setMetadata(metadataArray)
+ }
+
def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
- joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
+ val responseBody = new JoinGroupResponse(
+ new JoinGroupResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(joinResult.error.code())
+ .setGenerationId(joinResult.generationId)
+ .setProtocolName(joinResult.subProtocol)
+ .setLeader(joinResult.leaderId)
+ .setMemberId(joinResult.memberId)
+ .setMembers(members.toSeq.asJava)
+ )
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
@@ -1294,33 +1309,35 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, createResponse)
}
- if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.groupId(), LITERAL))) {
+ if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.data().groupId(), LITERAL))) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new JoinGroupResponse(
- requestThrottleMs,
- Errors.GROUP_AUTHORIZATION_FAILED,
- JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_PROTOCOL,
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
- Collections.emptyMap())
+ new JoinGroupResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
+ .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
+ .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
+ .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+ .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+ .setMembers(Collections.emptyList())
+ )
)
} else {
// Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
val requireKnownMemberId = joinGroupRequest.version >= 4
// let the coordinator handle join-group
- val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
- (protocol.name, Utils.toArray(protocol.metadata))).toList
+ val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
+ (protocol.name, protocol.metadata)).toList
groupCoordinator.handleJoinGroup(
- joinGroupRequest.groupId,
- joinGroupRequest.memberId,
+ joinGroupRequest.data().groupId,
+ joinGroupRequest.data().memberId,
requireKnownMemberId,
request.header.clientId,
request.session.clientAddress.toString,
- joinGroupRequest.rebalanceTimeout,
- joinGroupRequest.sessionTimeout,
- joinGroupRequest.protocolType,
+ joinGroupRequest.data().rebalanceTimeoutMs,
+ joinGroupRequest.data().sessionTimeoutMs,
+ joinGroupRequest.data().protocolType,
protocols,
sendResponseCallback)
}
@@ -1396,9 +1413,10 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(error: Errors) {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val response = new LeaveGroupResponse(new LeaveGroupResponseData()
- .setThrottleTimeMs(requestThrottleMs).setErrorCode(error.code()))
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(error.code()))
trace("Sending leave group response %s for correlation id %d to client %s."
- .format(response, request.header.correlationId, request.header.clientId))
+ .format(response, request.header.correlationId, request.header.clientId))
response
}
sendResponseMaybeThrottle(request, createResponse)
@@ -1407,7 +1425,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, Read, Resource(Group, leaveGroupRequest.data().groupId(), LITERAL))) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new LeaveGroupResponse(new LeaveGroupResponseData()
- .setThrottleTimeMs(requestThrottleMs).setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
} else {
// let the coordinator to handle leave-group
groupCoordinator.handleLeaveGroup(
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 044f595..30cc161 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -310,9 +310,21 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createJoinGroupRequest = {
- new JoinGroupRequest.Builder(group, 10000, "", "consumer",
- List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
- .setRebalanceTimeout(60000).build()
+ val protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+ Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("consumer-range")
+ .setMetadata("test".getBytes())
+ ).iterator())
+
+ new JoinGroupRequest.Builder(
+ new JoinGroupRequestData()
+ .setGroupId(group)
+ .setSessionTimeoutMs(10000)
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setProtocolType("consumer")
+ .setProtocols(protocolSet)
+ .setRebalanceTimeoutMs(60000)
+ ).build()
}
private def createSyncGroupRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d070e46..32541d3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,7 +24,7 @@ import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
@@ -254,9 +254,21 @@ class RequestQuotaTest extends BaseRequestTest {
new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
case ApiKeys.JOIN_GROUP =>
- new JoinGroupRequest.Builder("test-join-group", 200, "", "consumer",
- List(new JoinGroupRequest.ProtocolMetadata("consumer-range", ByteBuffer.wrap("test".getBytes()))).asJava)
- .setRebalanceTimeout(100)
+ new JoinGroupRequest.Builder(
+ new JoinGroupRequestData()
+ .setGroupId("test-join-group")
+ .setSessionTimeoutMs(200)
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setProtocolType("consumer")
+ .setProtocols(
+ new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+ Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("consumer-range")
+ .setMetadata("test".getBytes())).iterator()
+ )
+ )
+ .setRebalanceTimeoutMs(100)
+ )
case ApiKeys.HEARTBEAT =>
new HeartbeatRequest.Builder("test-group", 1, "")