You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/15 22:09:14 UTC
[kafka] branch trunk updated: KAFKA-8354;
Replace Sync group request/response with automated protocol (#6729)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2208f99 KAFKA-8354; Replace Sync group request/response with automated protocol (#6729)
2208f99 is described below
commit 2208f9966d31a3f52bf9b63938bd6f3729378d9b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed May 15 15:09:00 2019 -0700
KAFKA-8354; Replace Sync group request/response with automated protocol (#6729)
Update SyncGroup API to use the generated protocol classes.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/AbstractCoordinator.java | 30 ++++-
.../consumer/internals/ConsumerCoordinator.java | 2 +-
.../org/apache/kafka/common/protocol/ApiKeys.java | 10 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../kafka/common/requests/SyncGroupRequest.java | 139 +++++----------------
.../kafka/common/requests/SyncGroupResponse.java | 79 +++---------
.../kafka/clients/consumer/KafkaConsumerTest.java | 7 +-
.../internals/AbstractCoordinatorTest.java | 7 +-
.../internals/ConsumerCoordinatorTest.java | 62 ++++-----
.../runtime/distributed/WorkerCoordinatorTest.java | 26 ++--
core/src/main/scala/kafka/server/KafkaApis.scala | 22 +++-
.../kafka/api/AuthorizerIntegrationTest.scala | 10 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +-
13 files changed, 169 insertions(+), 235 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index ebe8231..cbfd76e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -58,10 +59,12 @@ import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.io.Closeable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -589,8 +592,13 @@ public abstract class AbstractCoordinator implements Closeable {
private RequestFuture<ByteBuffer> onJoinFollower() {
// send follower's sync group with an empty assignment
SyncGroupRequest.Builder requestBuilder =
- new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
- Collections.<String, ByteBuffer>emptyMap());
+ new SyncGroupRequest.Builder(
+ new SyncGroupRequestData()
+ .setGroupId(groupId)
+ .setMemberId(generation.memberId)
+ .setGenerationId(generation.generationId)
+ .setAssignments(Collections.emptyList())
+ );
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}
@@ -601,8 +609,22 @@ public abstract class AbstractCoordinator implements Closeable {
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
joinResponse.data().members());
+ List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
+ for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
+ groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
+ .setMemberId(assignment.getKey())
+ .setAssignment(Utils.toArray(assignment.getValue()))
+ );
+ }
+
SyncGroupRequest.Builder requestBuilder =
- new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
+ new SyncGroupRequest.Builder(
+ new SyncGroupRequestData()
+ .setGroupId(groupId)
+ .setMemberId(generation.memberId)
+ .setGenerationId(generation.generationId)
+ .setAssignments(groupAssignmentList)
+ );
log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
@@ -624,7 +646,7 @@ public abstract class AbstractCoordinator implements Closeable {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
- future.complete(syncResponse.memberAssignment());
+ future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
} else {
requestRejoin();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 95cff78..6833579 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -182,7 +182,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName(assignor.name())
- .setMetadata(metadata.array()));
+ .setMetadata(Utils.toArray(metadata)));
}
return protocolSet;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3e2a87a..a7e3757 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -28,6 +28,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -42,8 +44,8 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
@@ -103,8 +105,6 @@ import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.StopReplicaResponse;
-import org.apache.kafka.common.requests.SyncGroupRequest;
-import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
@@ -140,7 +140,7 @@ public enum ApiKeys {
JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
- SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()),
+ SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS),
DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
DescribeGroupsResponseData.SCHEMAS),
LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 7ba9b7a..a7f5a38 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -91,7 +91,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case LEAVE_GROUP:
return new LeaveGroupResponse(struct, version);
case SYNC_GROUP:
- return new SyncGroupResponse(struct);
+ return new SyncGroupResponse(struct, version);
case STOP_REPLICA:
return new StopReplicaResponse(struct);
case CONTROLLED_SHUTDOWN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 237320f..37a2451 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -16,112 +16,48 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-
public class SyncGroupRequest extends AbstractRequest {
- private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
- private static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
-
- private static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(
- MEMBER_ID,
- new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
- private static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(
- GROUP_ID,
- GENERATION_ID,
- MEMBER_ID,
- new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- private static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema SYNC_GROUP_REQUEST_V2 = SYNC_GROUP_REQUEST_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1,
- SYNC_GROUP_REQUEST_V2};
- }
public static class Builder extends AbstractRequest.Builder<SyncGroupRequest> {
- private final String groupId;
- private final int generationId;
- private final String memberId;
- private final Map<String, ByteBuffer> groupAssignment;
- public Builder(String groupId, int generationId, String memberId,
- Map<String, ByteBuffer> groupAssignment) {
+ private final SyncGroupRequestData data;
+
+ public Builder(SyncGroupRequestData data) {
super(ApiKeys.SYNC_GROUP);
- this.groupId = groupId;
- this.generationId = generationId;
- this.memberId = memberId;
- this.groupAssignment = groupAssignment;
+ this.data = data;
}
@Override
public SyncGroupRequest build(short version) {
- return new SyncGroupRequest(groupId, generationId, memberId, groupAssignment, version);
+ return new SyncGroupRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type=SyncGroupRequest").
- append(", groupId=").append(groupId).
- append(", generationId=").append(generationId).
- append(", memberId=").append(memberId).
- append(", groupAssignment=").
- append(Utils.join(groupAssignment.keySet(), ",")).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
- private final String groupId;
- private final int generationId;
- private final String memberId;
- private final Map<String, ByteBuffer> groupAssignment;
- private SyncGroupRequest(String groupId, int generationId, String memberId,
- Map<String, ByteBuffer> groupAssignment, short version) {
+ public final SyncGroupRequestData data;
+
+ public SyncGroupRequest(SyncGroupRequestData data, short version) {
super(ApiKeys.SYNC_GROUP, version);
- this.groupId = groupId;
- this.generationId = generationId;
- this.memberId = memberId;
- this.groupAssignment = groupAssignment;
+ this.data = data;
}
public SyncGroupRequest(Struct struct, short version) {
super(ApiKeys.SYNC_GROUP, version);
- this.groupId = struct.get(GROUP_ID);
- this.generationId = struct.get(GENERATION_ID);
- this.memberId = struct.get(MEMBER_ID);
-
- groupAssignment = new HashMap<>();
-
- for (Object memberDataObj : struct.getArray(GROUP_ASSIGNMENT_KEY_NAME)) {
- Struct memberData = (Struct) memberDataObj;
- String memberId = memberData.get(MEMBER_ID);
- ByteBuffer memberMetadata = memberData.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
- groupAssignment.put(memberId, memberMetadata);
- }
+ this.data = new SyncGroupRequestData(struct, version);
}
@Override
@@ -130,34 +66,30 @@ public class SyncGroupRequest extends AbstractRequest {
switch (versionId) {
case 0:
return new SyncGroupResponse(
- Errors.forException(e),
- ByteBuffer.wrap(new byte[]{}));
+ new SyncGroupResponseData()
+ .setErrorCode(Errors.forException(e).code())
+ .setAssignment(new byte[0])
+ );
case 1:
case 2:
return new SyncGroupResponse(
- throttleTimeMs,
- Errors.forException(e),
- ByteBuffer.wrap(new byte[]{}));
+ new SyncGroupResponseData()
+ .setErrorCode(Errors.forException(e).code())
+ .setAssignment(new byte[0])
+ .setThrottleTimeMs(throttleTimeMs)
+ );
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion()));
}
}
- public String groupId() {
- return groupId;
- }
-
- public int generationId() {
- return generationId;
- }
-
- public Map<String, ByteBuffer> groupAssignment() {
- return groupAssignment;
- }
-
- public String memberId() {
- return memberId;
+ public Map<String, ByteBuffer> groupAssignments() {
+ Map<String, ByteBuffer> groupAssignments = new HashMap<>();
+ for (SyncGroupRequestData.SyncGroupRequestAssignment assignment : data.assignments()) {
+ groupAssignments.put(assignment.memberId(), ByteBuffer.wrap(assignment.assignment()));
+ }
+ return groupAssignments;
}
public static SyncGroupRequest parse(ByteBuffer buffer, short version) {
@@ -166,19 +98,6 @@ public class SyncGroupRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.SYNC_GROUP.requestSchema(version()));
- struct.set(GROUP_ID, groupId);
- struct.set(GENERATION_ID, generationId);
- struct.set(MEMBER_ID, memberId);
-
- List<Struct> memberArray = new ArrayList<>();
- for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) {
- Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME);
- memberData.set(MEMBER_ID, entries.getKey());
- memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue());
- memberArray.add(memberData);
- }
- struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray());
- return struct;
+ return data.toStruct(version());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 2b2fc6f..8c9bad1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -16,100 +16,49 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-
public class SyncGroupResponse extends AbstractResponse {
- private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
-
- private static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(
- ERROR_CODE,
- new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
- private static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
- THROTTLE_TIME_MS,
- ERROR_CODE,
- new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema SYNC_GROUP_RESPONSE_V2 = SYNC_GROUP_RESPONSE_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1,
- SYNC_GROUP_RESPONSE_V2};
- }
- /**
- * Possible error codes:
- *
- * COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR (16)
- * ILLEGAL_GENERATION (22)
- * UNKNOWN_MEMBER_ID (25)
- * REBALANCE_IN_PROGRESS (27)
- * GROUP_AUTHORIZATION_FAILED (30)
- *
- * NOTE: Currently the coordinator returns REBALANCE_IN_PROGRESS while the coordinator is
- * loading. On the next protocol bump, we should consider using COORDINATOR_LOAD_IN_PROGRESS
- * to be consistent with the other APIs.
- */
+ public final SyncGroupResponseData data;
- private final Errors error;
- private final int throttleTimeMs;
- private final ByteBuffer memberState;
-
- public SyncGroupResponse(Errors error, ByteBuffer memberState) {
- this(DEFAULT_THROTTLE_TIME, error, memberState);
+ public SyncGroupResponse(SyncGroupResponseData data) {
+ this.data = data;
}
- public SyncGroupResponse(int throttleTimeMs, Errors error, ByteBuffer memberState) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
- this.memberState = memberState;
+ public SyncGroupResponse(Struct struct) {
+ short latestVersion = (short) (SyncGroupResponseData.SCHEMAS.length - 1);
+ this.data = new SyncGroupResponseData(struct, latestVersion);
}
- public SyncGroupResponse(Struct struct) {
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- this.error = Errors.forCode(struct.get(ERROR_CODE));
- this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+ public SyncGroupResponse(Struct struct, short version) {
+ this.data = new SyncGroupResponseData(struct, version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
public Errors error() {
- return error;
+ return Errors.forCode(data.errorCode());
}
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
- }
-
- public ByteBuffer memberAssignment() {
- return memberState;
+ return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version));
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
- struct.set(ERROR_CODE, error.code());
- struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
- return struct;
+ return data.toStruct(version);
}
public static SyncGroupResponse parse(ByteBuffer buffer, short version) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 3a4076b..09eb94a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
@@ -1763,7 +1764,11 @@ public class KafkaConsumerTest {
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
- return new SyncGroupResponse(error, buf);
+ return new SyncGroupResponse(
+ new SyncGroupResponseData()
+ .setErrorCode(error.code())
+ .setAssignment(Utils.toArray(buf))
+ );
}
private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> offsets, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 8d8afd2..449c58f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -793,7 +794,11 @@ public class AbstractCoordinatorTest {
}
private SyncGroupResponse syncGroupResponse(Errors error) {
- return new SyncGroupResponse(error, ByteBuffer.allocate(0));
+ return new SyncGroupResponse(
+ new SyncGroupResponseData()
+ .setErrorCode(error.code())
+ .setAssignment(new byte[0])
+ );
}
public static class DummyCoordinator extends AbstractCoordinator {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 4f6e0f1..d8b1252 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
@@ -60,6 +61,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -416,9 +418,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().containsKey(consumerId);
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -452,9 +454,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().containsKey(consumerId);
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse(Arrays.asList(t2p), Errors.NONE));
@@ -466,9 +468,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().containsKey(consumerId);
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -508,9 +510,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().containsKey(consumerId);
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse(singletonList(t2p), Errors.NONE));
assertThrows(IllegalStateException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
@@ -538,9 +540,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().containsKey(consumerId);
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
@@ -648,9 +650,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().isEmpty();
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().isEmpty();
}
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -722,9 +724,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().isEmpty();
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().isEmpty();
}
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -787,9 +789,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().isEmpty();
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().isEmpty();
}
}, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
@@ -1038,9 +1040,9 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- if (sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().containsKey(consumerId)) {
+ if (sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId)) {
// trigger the metadata update including both topics after the sync group request has been sent
Map<String, Integer> topicPartitionCounts = new HashMap<>();
topicPartitionCounts.put(topic1, 1);
@@ -2228,7 +2230,11 @@ public class ConsumerCoordinatorTest {
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
- return new SyncGroupResponse(error, buf);
+ return new SyncGroupResponse(
+ new SyncGroupResponseData()
+ .setErrorCode(error.code())
+ .setAssignment(Utils.toArray(buf))
+ );
}
private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3684025..9f980a6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -224,9 +226,9 @@ public class WorkerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(consumerId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().containsKey(consumerId);
+ return sync.data.memberId().equals(consumerId) &&
+ sync.data.generationId() == 1 &&
+ sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
@@ -261,9 +263,9 @@ public class WorkerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(memberId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().isEmpty();
+ return sync.data.memberId().equals(memberId) &&
+ sync.data.generationId() == 1 &&
+ sync.data.assignments().isEmpty();
}
}, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
Collections.singletonList(taskId1x0), Errors.NONE));
@@ -302,9 +304,9 @@ public class WorkerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.memberId().equals(memberId) &&
- sync.generationId() == 1 &&
- sync.groupAssignment().isEmpty();
+ return sync.data.memberId().equals(memberId) &&
+ sync.data.generationId() == 1 &&
+ sync.data.assignments().isEmpty();
}
};
client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
@@ -526,7 +528,11 @@ public class WorkerCoordinatorTest {
List<ConnectorTaskId> taskIds, Errors error) {
ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds);
ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment);
- return new SyncGroupResponse(error, buf);
+ return new SyncGroupResponse(
+ new SyncGroupResponseData()
+ .setErrorCode(error.code())
+ .setAssignment(Utils.toArray(buf))
+ );
}
private static class MockRebalanceListener implements WorkerRebalanceListener {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 302718d..6bd7174 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1377,17 +1377,27 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(memberState: Array[Byte], error: Errors) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState)))
+ new SyncGroupResponse(
+ new SyncGroupResponseData()
+ .setErrorCode(error.code)
+ .setAssignment(memberState)
+ .setThrottleTimeMs(requestThrottleMs)
+ ))
}
- if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.groupId(), LITERAL))) {
+ if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.data.groupId, LITERAL))) {
sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
} else {
+ val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
+ syncGroupRequest.data.assignments.asScala.foreach { assignment =>
+ assignmentMap += (assignment.memberId -> assignment.assignment)
+ }
+
groupCoordinator.handleSyncGroup(
- syncGroupRequest.groupId,
- syncGroupRequest.generationId,
- syncGroupRequest.memberId,
- syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
+ syncGroupRequest.data.groupId,
+ syncGroupRequest.data.generationId,
+ syncGroupRequest.data.memberId,
+ assignmentMap.result,
sendResponseCallback
)
}
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ac8153c..aa6a0dd 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -166,7 +166,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
- ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
+ ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => Errors.forCode(resp.data.errorCode())),
ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => {
val errorCode = resp.data().groups().asScala.find(g => group.equals(g.groupId())).head.errorCode()
Errors.forCode(errorCode)
@@ -340,7 +340,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createSyncGroupRequest = {
- new SyncGroupRequest.Builder(group, 1, "", Map[String, ByteBuffer]().asJava).build()
+ new SyncGroupRequest.Builder(
+ new SyncGroupRequestData()
+ .setGroupId(group)
+ .setGenerationId(1)
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setAssignments(Collections.emptyList())
+ ).build()
}
private def createDescribeGroupsRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 52ac700..eadcffb 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -306,7 +306,13 @@ class RequestQuotaTest extends BaseRequestTest {
new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId("test-leave-group").setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
case ApiKeys.SYNC_GROUP =>
- new SyncGroupRequest.Builder("test-sync-group", 1, "", Map[String, ByteBuffer]().asJava)
+ new SyncGroupRequest.Builder(
+ new SyncGroupRequestData()
+ .setGroupId("test-sync-group")
+ .setGenerationId(1)
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setAssignments(Collections.emptyList())
+ )
case ApiKeys.DESCRIBE_GROUPS =>
new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava))