You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/04/25 23:43:09 UTC
[kafka] branch trunk updated: KAFKA-7903: automatically generate
OffsetCommitRequest (#6583)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0c2d829 KAFKA-7903: automatically generate OffsetCommitRequest (#6583)
0c2d829 is described below
commit 0c2d829249567c6a87140ae25f631a2abad11d00
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Apr 25 16:42:54 2019 -0700
KAFKA-7903: automatically generate OffsetCommitRequest (#6583)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../consumer/internals/ConsumerCoordinator.java | 121 +++++---
.../org/apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../kafka/common/requests/OffsetCommitRequest.java | 339 ++++-----------------
.../common/requests/OffsetCommitResponse.java | 143 ++++-----
.../kafka/clients/consumer/KafkaConsumerTest.java | 5 +-
.../internals/ConsumerCoordinatorTest.java | 48 ++-
.../kafka/common/requests/RequestResponseTest.java | 44 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 66 ++--
.../kafka/api/AuthorizerIntegrationTest.scala | 25 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 25 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 34 ++-
11 files changed, 366 insertions(+), 490 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 68e98ba..4d40070 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -42,6 +44,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -770,14 +773,27 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return RequestFuture.coordinatorNotAvailable();
// create the offset commit request
- Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
+ Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata.offset() < 0) {
return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
}
- offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(offsetAndMetadata.offset(),
- offsetAndMetadata.leaderEpoch(), offsetAndMetadata.metadata()));
+
+ OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
+ .getOrDefault(topicPartition.topic(),
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(topicPartition.topic())
+ );
+
+ topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(topicPartition.partition())
+ .setCommittedOffset(offsetAndMetadata.offset())
+ .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+ .setCommittedMetadata(offsetAndMetadata.metadata())
+ );
+ requestTopicDataMap.put(topicPartition.topic(), topic);
}
final Generation generation;
@@ -791,9 +807,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (generation == null)
return RequestFuture.failure(new CommitFailedException());
- OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
- setGenerationId(generation.generationId).
- setMemberId(generation.memberId);
+ OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
+ new OffsetCommitRequestData()
+ .setGroupId(this.groupId)
+ .setGenerationId(generation.generationId)
+ .setMemberId(generation.memberId)
+ .setTopics(new ArrayList<>(requestTopicDataMap.values()))
+ );
log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);
@@ -814,52 +834,55 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
- for (Map.Entry<TopicPartition, Errors> entry : commitResponse.responseData().entrySet()) {
- TopicPartition tp = entry.getKey();
- OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
- long offset = offsetAndMetadata.offset();
+ for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
+ for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
+ TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
+ OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
- Errors error = entry.getValue();
- if (error == Errors.NONE) {
- log.debug("Committed offset {} for partition {}", offset, tp);
- } else {
- if (error.exception() instanceof RetriableException) {
- log.warn("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
- } else {
- log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
- }
+ long offset = offsetAndMetadata.offset();
- if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
- return;
- } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
- unauthorizedTopics.add(tp.topic());
- } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
- || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
- // raise the error to the user
- future.raise(error);
- return;
- } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
- || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
- // just retry
- future.raise(error);
- return;
- } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
- || error == Errors.NOT_COORDINATOR
- || error == Errors.REQUEST_TIMED_OUT) {
- markCoordinatorUnknown();
- future.raise(error);
- return;
- } else if (error == Errors.UNKNOWN_MEMBER_ID
- || error == Errors.ILLEGAL_GENERATION
- || error == Errors.REBALANCE_IN_PROGRESS) {
- // need to re-join group
- resetGeneration();
- future.raise(new CommitFailedException());
- return;
+ Errors error = Errors.forCode(partition.errorCode());
+ if (error == Errors.NONE) {
+ log.debug("Committed offset {} for partition {}", offset, tp);
} else {
- future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
- return;
+ if (error.exception() instanceof RetriableException) {
+ log.warn("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
+ } else {
+ log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
+ }
+
+ if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+ future.raise(new GroupAuthorizationException(groupId));
+ return;
+ } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+ unauthorizedTopics.add(tp.topic());
+ } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
+ || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
+ // raise the error to the user
+ future.raise(error);
+ return;
+ } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
+ || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+ // just retry
+ future.raise(error);
+ return;
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+ || error == Errors.NOT_COORDINATOR
+ || error == Errors.REQUEST_TIMED_OUT) {
+ markCoordinatorUnknown();
+ future.raise(error);
+ return;
+ } else if (error == Errors.UNKNOWN_MEMBER_ID
+ || error == Errors.ILLEGAL_GENERATION
+ || error == Errors.REBALANCE_IN_PROGRESS) {
+ // need to re-join group
+ resetGeneration();
+ future.raise(new CommitFailedException());
+ return;
+ } else {
+ future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
+ return;
+ }
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 5391135..8109182 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -91,8 +93,6 @@ import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
@@ -133,7 +133,7 @@ public enum ApiKeys {
UpdateMetadataResponse.schemaVersions()),
CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequestData.SCHEMAS,
ControlledShutdownResponseData.SCHEMAS),
- OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequest.schemaVersions(), OffsetCommitResponse.schemaVersions()),
+ OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(),
FindCoordinatorResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index fdb5b10..dd4cf78 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -17,126 +17,19 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_LEADER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_METADATA;
-import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_OFFSET;
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
public class OffsetCommitRequest extends AbstractRequest {
- // top level fields
- private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
- "Topics to commit offsets");
-
- // topic level fields
- private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
- "Partitions to commit offsets");
-
- // partition level fields
- @Deprecated
- private static final Field.Int64 COMMIT_TIMESTAMP = new Field.Int64("timestamp", "Timestamp of the commit");
- private static final Field.Int64 RETENTION_TIME = new Field.Int64("retention_time",
- "Time period in ms to retain the offset.");
-
- private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
- PARTITION_ID,
- COMMITTED_OFFSET,
- COMMITTED_METADATA);
-
- private static final Field TOPICS_V0 = TOPICS.withFields(
- TOPIC_NAME,
- PARTITIONS_V0);
-
- private static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(
- GROUP_ID,
- TOPICS_V0);
-
- // V1 adds timestamp and group membership information (generation and memberId)
- private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
- PARTITION_ID,
- COMMITTED_OFFSET,
- COMMIT_TIMESTAMP,
- COMMITTED_METADATA);
-
- private static final Field TOPICS_V1 = TOPICS.withFields(
- TOPIC_NAME,
- PARTITIONS_V1);
-
- private static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(
- GROUP_ID,
- GENERATION_ID,
- MEMBER_ID,
- TOPICS_V1);
-
- // V2 adds retention time
- private static final Field PARTITIONS_V2 = PARTITIONS.withFields(
- PARTITION_ID,
- COMMITTED_OFFSET,
- COMMITTED_METADATA);
-
- private static final Field TOPICS_V2 = TOPICS.withFields(
- TOPIC_NAME,
- PARTITIONS_V2);
-
- private static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(
- GROUP_ID,
- GENERATION_ID,
- MEMBER_ID,
- RETENTION_TIME,
- TOPICS_V2);
-
- // V3 adds throttle time
- private static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2;
-
- // V4 bump used to indicate that on quota violation brokers send out responses before throttling.
- private static final Schema OFFSET_COMMIT_REQUEST_V4 = OFFSET_COMMIT_REQUEST_V3;
-
- // V5 removes the retention time which is now controlled only by a broker configuration
- private static final Schema OFFSET_COMMIT_REQUEST_V5 = new Schema(
- GROUP_ID,
- GENERATION_ID,
- MEMBER_ID,
- TOPICS_V2);
-
- // V6 adds the leader epoch to the partition data
- private static final Field PARTITIONS_V6 = PARTITIONS.withFields(
- PARTITION_ID,
- COMMITTED_OFFSET,
- COMMITTED_LEADER_EPOCH,
- COMMITTED_METADATA);
-
- private static final Field TOPICS_V6 = TOPICS.withFields(
- TOPIC_NAME,
- PARTITIONS_V6);
-
- private static final Schema OFFSET_COMMIT_REQUEST_V6 = new Schema(
- GROUP_ID,
- GENERATION_ID,
- MEMBER_ID,
- TOPICS_V6);
-
- public static Schema[] schemaVersions() {
- return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
- OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4, OFFSET_COMMIT_REQUEST_V5, OFFSET_COMMIT_REQUEST_V6};
- }
-
// default values for the current version
public static final int DEFAULT_GENERATION_ID = -1;
public static final String DEFAULT_MEMBER_ID = "";
@@ -147,221 +40,115 @@ public class OffsetCommitRequest extends AbstractRequest {
@Deprecated
public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1
- private final String groupId;
- private final String memberId;
- private final int generationId;
- private final long retentionTime;
- private final Map<TopicPartition, PartitionData> offsetData;
-
- public static final class PartitionData {
- @Deprecated
- public final long timestamp; // for V1
-
- public final long offset;
- public final String metadata;
- public final Optional<Integer> leaderEpoch;
-
- private PartitionData(long offset, Optional<Integer> leaderEpoch, long timestamp, String metadata) {
- this.offset = offset;
- this.leaderEpoch = leaderEpoch;
- this.timestamp = timestamp;
- this.metadata = metadata;
- }
-
- @Deprecated
- public PartitionData(long offset, long timestamp, String metadata) {
- this(offset, Optional.empty(), timestamp, metadata);
- }
-
- public PartitionData(long offset, Optional<Integer> leaderEpoch, String metadata) {
- this(offset, leaderEpoch, DEFAULT_TIMESTAMP, metadata);
- }
-
- @Override
- public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(timestamp=").append(timestamp).
- append(", offset=").append(offset).
- append(", leaderEpoch=").append(leaderEpoch).
- append(", metadata=").append(metadata).
- append(")");
- return bld.toString();
- }
- }
+ private final OffsetCommitRequestData data;
public static class Builder extends AbstractRequest.Builder<OffsetCommitRequest> {
- private final String groupId;
- private final Map<TopicPartition, PartitionData> offsetData;
- private String memberId = DEFAULT_MEMBER_ID;
- private int generationId = DEFAULT_GENERATION_ID;
- public Builder(String groupId, Map<TopicPartition, PartitionData> offsetData) {
- super(ApiKeys.OFFSET_COMMIT);
- this.groupId = groupId;
- this.offsetData = offsetData;
- }
-
- public Builder setMemberId(String memberId) {
- this.memberId = memberId;
- return this;
- }
+ private final OffsetCommitRequestData data;
- public Builder setGenerationId(int generationId) {
- this.generationId = generationId;
- return this;
+ public Builder(OffsetCommitRequestData data) {
+ super(ApiKeys.OFFSET_COMMIT);
+ this.data = data;
}
@Override
public OffsetCommitRequest build(short version) {
- if (version == 0) {
- return new OffsetCommitRequest(groupId, DEFAULT_GENERATION_ID, DEFAULT_MEMBER_ID,
- DEFAULT_RETENTION_TIME, offsetData, version);
- } else {
- return new OffsetCommitRequest(groupId, generationId, memberId, DEFAULT_RETENTION_TIME,
- offsetData, version);
- }
+ return new OffsetCommitRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type=OffsetCommitRequest").
- append(", groupId=").append(groupId).
- append(", memberId=").append(memberId).
- append(", generationId=").append(generationId).
- append(", offsetData=").append(offsetData).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
- private OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime,
- Map<TopicPartition, PartitionData> offsetData, short version) {
+ private final short version;
+
+ public OffsetCommitRequest(OffsetCommitRequestData data, short version) {
super(ApiKeys.OFFSET_COMMIT, version);
- this.groupId = groupId;
- this.generationId = generationId;
- this.memberId = memberId;
- this.retentionTime = retentionTime;
- this.offsetData = offsetData;
+ this.data = data;
+ this.version = version;
}
- public OffsetCommitRequest(Struct struct, short versionId) {
- super(ApiKeys.OFFSET_COMMIT, versionId);
-
- groupId = struct.get(GROUP_ID);
- // These fields only exists in v1.
- generationId = struct.getOrElse(GENERATION_ID, DEFAULT_GENERATION_ID);
- memberId = struct.getOrElse(MEMBER_ID, DEFAULT_MEMBER_ID);
+ public OffsetCommitRequest(Struct struct, short version) {
+ super(ApiKeys.OFFSET_COMMIT, version);
+ this.data = new OffsetCommitRequestData(struct, version);
+ this.version = version;
+ }
- // This field only exists in v2
- retentionTime = struct.getOrElse(RETENTION_TIME, DEFAULT_RETENTION_TIME);
+ public OffsetCommitRequestData data() {
+ return data;
+ }
- offsetData = new HashMap<>();
- for (Object topicDataObj : struct.get(TOPICS)) {
- Struct topicData = (Struct) topicDataObj;
- String topic = topicData.get(TOPIC_NAME);
- for (Object partitionDataObj : topicData.get(PARTITIONS)) {
- Struct partitionDataStruct = (Struct) partitionDataObj;
- int partition = partitionDataStruct.get(PARTITION_ID);
- long offset = partitionDataStruct.get(COMMITTED_OFFSET);
- String metadata = partitionDataStruct.get(COMMITTED_METADATA);
- PartitionData partitionOffset;
- // This field only exists in v1
- if (partitionDataStruct.hasField(COMMIT_TIMESTAMP)) {
- long timestamp = partitionDataStruct.get(COMMIT_TIMESTAMP);
- partitionOffset = new PartitionData(offset, timestamp, metadata);
- } else {
- Optional<Integer> leaderEpochOpt = RequestUtils.getLeaderEpoch(partitionDataStruct,
- COMMITTED_LEADER_EPOCH);
- partitionOffset = new PartitionData(offset, leaderEpochOpt, metadata);
- }
- offsetData.put(new TopicPartition(topic, partition), partitionOffset);
+ public Map<TopicPartition, Long> offsets() {
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ for (OffsetCommitRequestData.OffsetCommitRequestTopic topic : data.topics()) {
+ for (OffsetCommitRequestData.OffsetCommitRequestPartition partition : topic.partitions()) {
+ offsets.put(new TopicPartition(topic.name(), partition.partitionIndex()),
+ partition.committedOffset());
}
}
+ return offsets;
}
- @Override
- public Struct toStruct() {
- short version = version();
- Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.requestSchema(version));
- struct.set(GROUP_ID, groupId);
-
- Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupPartitionDataByTopic(offsetData);
- List<Struct> topicArray = new ArrayList<>();
- for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
- Struct topicData = struct.instance(TOPICS);
- topicData.set(TOPIC_NAME, topicEntry.getKey());
- List<Struct> partitionArray = new ArrayList<>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
- PartitionData fetchPartitionData = partitionEntry.getValue();
- Struct partitionData = topicData.instance(PARTITIONS);
- partitionData.set(PARTITION_ID, partitionEntry.getKey());
- partitionData.set(COMMITTED_OFFSET, fetchPartitionData.offset);
- // Only for v1
- partitionData.setIfExists(COMMIT_TIMESTAMP, fetchPartitionData.timestamp);
- // Only for v6
- RequestUtils.setLeaderEpochIfExists(partitionData, COMMITTED_LEADER_EPOCH, fetchPartitionData.leaderEpoch);
- partitionData.set(COMMITTED_METADATA, fetchPartitionData.metadata);
- partitionArray.add(partitionData);
+ public static List<OffsetCommitResponseData.OffsetCommitResponseTopic> getErrorResponseTopics(
+ List<OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopics,
+ Errors e) {
+ List<OffsetCommitResponseData.OffsetCommitResponseTopic>
+ responseTopicData = new ArrayList<>();
+ for (OffsetCommitRequestData.OffsetCommitRequestTopic entry : requestTopics) {
+ List<OffsetCommitResponseData.OffsetCommitResponsePartition> responsePartitions =
+ new ArrayList<>();
+ for (OffsetCommitRequestData.OffsetCommitRequestPartition requestPartition : entry.partitions()) {
+ responsePartitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(requestPartition.partitionIndex())
+ .setErrorCode(e.code()));
}
- topicData.set(PARTITIONS, partitionArray.toArray());
- topicArray.add(topicData);
+ responseTopicData.add(new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName(entry.name())
+ .setPartitions(responsePartitions)
+ );
}
- struct.set(TOPICS, topicArray.toArray());
- struct.setIfExists(GENERATION_ID, generationId);
- struct.setIfExists(MEMBER_ID, memberId);
- struct.setIfExists(RETENTION_TIME, retentionTime);
- return struct;
+ return responseTopicData;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- Map<TopicPartition, Errors> responseData = new HashMap<>();
- for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
- responseData.put(entry.getKey(), Errors.forException(e));
- }
+ List<OffsetCommitResponseData.OffsetCommitResponseTopic>
+ responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e));
short versionId = version();
switch (versionId) {
case 0:
case 1:
case 2:
- return new OffsetCommitResponse(responseData);
+ return new OffsetCommitResponse(
+ new OffsetCommitResponseData()
+ .setTopics(responseTopicData)
+ );
case 3:
case 4:
case 5:
case 6:
- return new OffsetCommitResponse(throttleTimeMs, responseData);
+ return new OffsetCommitResponse(
+ new OffsetCommitResponseData()
+ .setTopics(responseTopicData)
+ .setThrottleTimeMs(throttleTimeMs)
+
+ );
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.OFFSET_COMMIT.latestVersion()));
}
}
- public String groupId() {
- return groupId;
- }
-
- public int generationId() {
- return generationId;
- }
-
- public String memberId() {
- return memberId;
- }
-
- @Deprecated
- public long retentionTime() {
- return retentionTime;
- }
-
- public Map<TopicPartition, PartitionData> offsetData() {
- return offsetData;
+ public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
+ return new OffsetCommitRequest(ApiKeys.OFFSET_COMMIT.parseRequest(version, buffer), version);
}
- public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
- Schema schema = ApiKeys.OFFSET_COMMIT.requestSchema(version);
- return new OffsetCommitRequest(schema.read(buffer), version);
+ @Override
+ protected Struct toStruct() {
+ return data.toStruct(version);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 4d724a3..7f2603e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -17,24 +17,16 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-
/**
* Possible error codes:
*
@@ -52,117 +44,86 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
* GROUP_AUTHORIZATION_FAILED (30)
*/
public class OffsetCommitResponse extends AbstractResponse {
- private static final Field.ComplexArray TOPICS = new Field.ComplexArray("responses",
- "Responses by topic for committed partitions");
-
- // topic level fields
- private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partition_responses",
- "Responses for committed partitions");
-
- private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
- PARTITION_ID,
- ERROR_CODE);
-
- private static final Field TOPICS_V0 = TOPICS.withFields(
- TOPIC_NAME,
- PARTITIONS_V0);
- private static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(
- TOPICS_V0);
+ private final OffsetCommitResponseData data;
- // V1 adds timestamp and group membership information (generation and memberId) to the request
- private static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
-
- // V2 adds retention time to the request
- private static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
+ public OffsetCommitResponse(OffsetCommitResponseData data) {
+ this.data = data;
+ }
- // V3 adds throttle time
- private static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema(
- THROTTLE_TIME_MS,
- TOPICS_V0);
+ public OffsetCommitResponse(int requestThrottleMs, Map<TopicPartition, Errors> responseData) {
+ Map<String, OffsetCommitResponseData.OffsetCommitResponseTopic>
+ responseTopicDataMap = new HashMap<>();
- // V4 bump used to indicate that on quota violation brokers send out responses before throttling.
- private static final Schema OFFSET_COMMIT_RESPONSE_V4 = OFFSET_COMMIT_RESPONSE_V3;
+ for (Map.Entry<TopicPartition, Errors> entry : responseData.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ String topicName = topicPartition.topic();
- // V5 removes retention time from the request
- private static final Schema OFFSET_COMMIT_RESPONSE_V5 = OFFSET_COMMIT_RESPONSE_V4;
+ OffsetCommitResponseData.OffsetCommitResponseTopic topic = responseTopicDataMap
+ .getOrDefault(topicName, new OffsetCommitResponseData.OffsetCommitResponseTopic());
- // V6 adds leader epoch to the request
- private static final Schema OFFSET_COMMIT_RESPONSE_V6 = OFFSET_COMMIT_RESPONSE_V5;
+ if (topic.name().equals("")) {
+ topic.setName(topicName);
+ }
+ topic.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setErrorCode(entry.getValue().code())
+ .setPartitionIndex(topicPartition.partition())
+ );
+ responseTopicDataMap.put(topicName, topic);
+ }
- public static Schema[] schemaVersions() {
- return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2,
- OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4, OFFSET_COMMIT_RESPONSE_V5, OFFSET_COMMIT_RESPONSE_V6};
+ data = new OffsetCommitResponseData()
+ .setTopics(new ArrayList<>(responseTopicDataMap.values()))
+ .setThrottleTimeMs(requestThrottleMs);
}
- private final Map<TopicPartition, Errors> responseData;
- private final int throttleTimeMs;
-
public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) {
this(DEFAULT_THROTTLE_TIME, responseData);
}
- public OffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> responseData) {
- this.throttleTimeMs = throttleTimeMs;
- this.responseData = responseData;
+ public OffsetCommitResponse(Struct struct) {
+ short latestVersion = (short) (OffsetCommitResponseData.SCHEMAS.length - 1);
+ this.data = new OffsetCommitResponseData(struct, latestVersion);
}
- public OffsetCommitResponse(Struct struct) {
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- responseData = new HashMap<>();
- for (Object topicResponseObj : struct.get(TOPICS)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.get(TOPIC_NAME);
- for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionResponse.get(ERROR_CODE));
- responseData.put(new TopicPartition(topic, partition), error);
- }
- }
+ public OffsetCommitResponse(Struct struct, short version) {
+ this.data = new OffsetCommitResponseData(struct, version);
+ }
+
+ public OffsetCommitResponseData data() {
+ return data;
}
@Override
- public Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.responseSchema(version));
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
- Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupPartitionDataByTopic(responseData);
- List<Struct> topicArray = new ArrayList<>();
- for (Map.Entry<String, Map<Integer, Errors>> entries: topicsData.entrySet()) {
- Struct topicData = struct.instance(TOPICS);
- topicData.set(TOPIC_NAME, entries.getKey());
- List<Struct> partitionArray = new ArrayList<>();
- for (Map.Entry<Integer, Errors> partitionEntry : entries.getValue().entrySet()) {
- Struct partitionData = topicData.instance(PARTITIONS);
- partitionData.set(PARTITION_ID, partitionEntry.getKey());
- partitionData.set(ERROR_CODE, partitionEntry.getValue().code());
- partitionArray.add(partitionData);
+ public Map<Errors, Integer> errorCounts() {
+ Map<TopicPartition, Errors> errorMap = new HashMap<>();
+ for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : data.topics()) {
+ for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
+ errorMap.put(new TopicPartition(topic.name(), partition.partitionIndex()),
+ Errors.forCode(partition.errorCode()));
}
- topicData.set(PARTITIONS, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(TOPICS, topicArray.toArray());
- return struct;
+ }
+ return errorCounts(errorMap);
}
- @Override
- public int throttleTimeMs() {
- return throttleTimeMs;
+ public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
+ return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer), version);
}
- public Map<TopicPartition, Errors> responseData() {
- return responseData;
+ @Override
+ public Struct toStruct(short version) {
+ return data.toStruct(version);
}
@Override
- public Map<Errors, Integer> errorCounts() {
- return errorCounts(responseData);
+ public String toString() {
+ return data.toString();
}
- public static OffsetCommitResponse parse(ByteBuffer buffer, short version) {
- return new OffsetCommitResponse(ApiKeys.OFFSET_COMMIT.parseResponse(version, buffer));
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f31ca52..25c72c6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1724,10 +1724,11 @@ public class KafkaConsumerTest {
@Override
public boolean matches(AbstractRequest body) {
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
+ Map<TopicPartition, Long> commitErrors = commitRequest.offsets();
+
for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
- OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partitionOffset.getKey());
// verify that the expected offset has been committed
- if (partitionData.offset != partitionOffset.getValue()) {
+ if (!commitErrors.get(partitionOffset.getKey()).equals(partitionOffset.getValue())) {
commitReceived.set(false);
return false;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3cdbbfa..645e6ed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -40,8 +40,11 @@ import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -249,10 +252,24 @@ public class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
- Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = singletonMap(
- new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L,
- Optional.empty(), ""));
- consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(groupId, offsets))
+
+ OffsetCommitRequestData offsetCommitRequestData = new OffsetCommitRequestData()
+ .setGroupId(groupId)
+ .setTopics(Collections.singletonList(new
+ OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ .setCommittedMetadata("")
+ .setCommittedOffset(13L)
+ .setCommitTimestamp(0)
+ ))
+ )
+ );
+
+ consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(offsetCommitRequestData))
.compose(new RequestFutureAdapter<ClientResponse, Object>() {
@Override
public void onSuccess(ClientResponse value, RequestFuture<Object> future) {}
@@ -1495,8 +1512,8 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
- return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
- commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
+ return commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
+ commitRequest.data().generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
}
}, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
@@ -2127,9 +2144,9 @@ public class ConsumerCoordinatorTest {
public boolean matches(AbstractRequest body) {
commitRequested.set(true);
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
- return commitRequest.groupId().equals(groupId);
+ return commitRequest.data().groupId().equals(groupId);
}
- }, new OffsetCommitResponse(new HashMap<TopicPartition, Errors>()));
+ }, new OffsetCommitResponse(new OffsetCommitResponseData()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@@ -2291,19 +2308,20 @@ public class ConsumerCoordinatorTest {
@Override
public boolean matches(AbstractRequest body) {
OffsetCommitRequest req = (OffsetCommitRequest) body;
- Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = req.offsetData();
+ Map<TopicPartition, Long> offsets = req.offsets();
if (offsets.size() != expectedOffsets.size())
return false;
for (Map.Entry<TopicPartition, Long> expectedOffset : expectedOffsets.entrySet()) {
- if (!offsets.containsKey(expectedOffset.getKey()))
- return false;
-
- OffsetCommitRequest.PartitionData offsetCommitData = offsets.get(expectedOffset.getKey());
- if (offsetCommitData.offset != expectedOffset.getValue())
+ if (!offsets.containsKey(expectedOffset.getKey())) {
return false;
+ } else {
+ Long actualOffset = offsets.get(expectedOffset.getKey());
+ if (!actualOffset.equals(expectedOffset.getValue())) {
+ return false;
+ }
+ }
}
-
return true;
}
};
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e595195..b6a2dad 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -58,6 +58,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -938,21 +940,41 @@ public class RequestResponseTest {
@SuppressWarnings("deprecation")
private OffsetCommitRequest createOffsetCommitRequest(int version) {
- Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
- commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100,
- RecordBatch.NO_PARTITION_LEADER_EPOCH, ""));
- commitData.put(new TopicPartition("test", 1), new OffsetCommitRequest.PartitionData(200,
- RecordBatch.NO_PARTITION_LEADER_EPOCH, null));
- return new OffsetCommitRequest.Builder("group1", commitData)
- .setGenerationId(100)
+ return new OffsetCommitRequest.Builder(new OffsetCommitRequestData()
+ .setGroupId("group1")
.setMemberId("consumer1")
- .build((short) version);
+ .setGenerationId(100)
+ .setTopics(Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("test")
+ .setPartitions(Arrays.asList(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ .setCommittedMetadata(""),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ .setCommittedMetadata(null)
+ ))
+ ))
+ ).build((short) version);
}
private OffsetCommitResponse createOffsetCommitResponse() {
- Map<TopicPartition, Errors> responseData = new HashMap<>();
- responseData.put(new TopicPartition("test", 0), Errors.NONE);
- return new OffsetCommitResponse(responseData);
+ return new OffsetCommitResponse(new OffsetCommitResponseData()
+ .setTopics(Collections.singletonList(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("test")
+ .setPartitions(Collections.singletonList(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ ))
+ );
}
private OffsetFetchRequest createOffsetFetchRequest(int version) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 56adc9b..aa368f8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
-import org.apache.kafka.common.message.{CreateTopicsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData, ElectPreferredLeadersResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData}
+import org.apache.kafka.common.message._
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
@@ -298,25 +298,33 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetCommitRequest = request.body[OffsetCommitRequest]
// reject the request if not authorized to the group
- if (!authorize(request.session, Read, Resource(Group, offsetCommitRequest.groupId, LITERAL))) {
+ if (!authorize(request.session, Read, Resource(Group, offsetCommitRequest.data().groupId, LITERAL))) {
val error = Errors.GROUP_AUTHORIZATION_FAILED
- val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
- (topicPartition, error)
- }.toMap
- sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
+ val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
+ offsetCommitRequest.data().topics(),
+ error)
+
+ sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
+ new OffsetCommitResponseData()
+ .setTopics(responseTopicList)
+ .setThrottleTimeMs(requestThrottleMs)
+ ))
} else {
val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
- val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData]
-
- for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
- if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
- unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
- else if (!metadataCache.contains(topicPartition))
- nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
- else
- authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+ val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+
+ for (topicData <- offsetCommitRequest.data().topics().asScala) {
+ for (partitionData <- topicData.partitions().asScala) {
+ val topicPartition = new TopicPartition(topicData.name(), partitionData.partitionIndex())
+ if (!authorize(request.session, Read, Resource(Topic, topicData.name(), LITERAL)))
+ unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
+ else if (!metadataCache.contains(topicPartition))
+ nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ else
+ authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+ }
}
val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
@@ -342,10 +350,14 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseInfo = authorizedTopicRequestInfo.map {
case (topicPartition, partitionData) =>
try {
- if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
+ if (partitionData.committedMetadata() != null
+ && partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
else {
- zkClient.setOrCreateConsumerOffset(offsetCommitRequest.groupId, topicPartition, partitionData.offset)
+ zkClient.setOrCreateConsumerOffset(
+ offsetCommitRequest.data().groupId(),
+ topicPartition,
+ partitionData.committedOffset())
(topicPartition, Errors.NONE)
}
} catch {
@@ -364,16 +376,20 @@ class KafkaApis(val requestChannel: RequestChannel,
// - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds
val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
- val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
+ val metadata = if (partitionData.committedMetadata() == null)
+ OffsetAndMetadata.NoMetadata
+ else
+ partitionData.committedMetadata()
+
new OffsetAndMetadata(
- offset = partitionData.offset,
- leaderEpoch = partitionData.leaderEpoch,
+ offset = partitionData.committedOffset(),
+ leaderEpoch = Optional.ofNullable(new Integer(partitionData.committedLeaderEpoch())),
metadata = metadata,
- commitTimestamp = partitionData.timestamp match {
+ commitTimestamp = partitionData.commitTimestamp() match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
case customTimestamp => customTimestamp
},
- expireTimestamp = offsetCommitRequest.retentionTime match {
+ expireTimestamp = offsetCommitRequest.data().retentionTimeMs() match {
case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
case retentionTime => Some(currentTimestamp + retentionTime)
}
@@ -382,9 +398,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// call coordinator to handle commit offset
groupCoordinator.handleCommitOffsets(
- offsetCommitRequest.groupId,
- offsetCommitRequest.memberId,
- offsetCommitRequest.generationId,
+ offsetCommitRequest.data().groupId(),
+ offsetCommitRequest.data().memberId(),
+ offsetCommitRequest.data().generationId(),
partitionData,
sendResponseCallback)
}
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0bc1045..23a0ad0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigSet}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.LITERAL
@@ -158,7 +158,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
- ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1 == tp).get._2),
+ ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => Errors.forCode(
+ resp.data().topics().get(0).partitions().get(0).errorCode())),
ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
@@ -342,9 +343,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createOffsetCommitRequest = {
new requests.OffsetCommitRequest.Builder(
- group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0L, Optional.empty[Integer](), "metadata")).asJava).
- setMemberId("").setGenerationId(1).
- build()
+ new OffsetCommitRequestData()
+ .setGroupId(group)
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setGenerationId(1)
+ .setTopics(Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(part)
+ .setCommittedOffset(0)
+ .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ .setCommitTimestamp(OffsetCommitRequest.DEFAULT_TIMESTAMP)
+ .setCommittedMetadata("metadata")
+ )))
+ )
+ ).build()
}
private def createPartitionsRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f3eae1e..54316f3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -48,6 +48,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
+import org.apache.kafka.common.message.OffsetCommitRequestData
import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.junit.{After, Test}
@@ -117,10 +118,21 @@ class KafkaApisTest {
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
- val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
- val partitionOffsetCommitData = new OffsetCommitRequest.PartitionData(15L, Optional.empty[Integer](), "")
- val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder("groupId",
- Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
+ val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder(
+ new OffsetCommitRequestData()
+ .setGroupId("groupId")
+ .setTopics(Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(invalidPartitionId)
+ .setCommittedOffset(15)
+ .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ .setCommittedMetadata(""))
+ )
+ ))
+ ))
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
@@ -128,7 +140,8 @@ class KafkaApisTest {
val response = readResponse(ApiKeys.OFFSET_COMMIT, offsetCommitRequest, capturedResponse)
.asInstanceOf[OffsetCommitResponse]
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData().get(invalidTopicPartition))
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ Errors.forCode(response.data().topics().get(0).partitions().get(0).errorCode()))
}
checkInvalidPartition(-1)
@@ -571,7 +584,7 @@ class KafkaApisTest {
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
listenerName, SecurityProtocol.PLAINTEXT)
- (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
+ (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics))
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 710ed57..ae74fa1 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -27,7 +27,6 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
@@ -250,10 +249,27 @@ class RequestQuotaTest extends BaseRequestTest {
ApiKeys.CONTROLLED_SHUTDOWN.latestVersion)
case ApiKeys.OFFSET_COMMIT =>
- new OffsetCommitRequest.Builder("test-group",
- Map(tp -> new OffsetCommitRequest.PartitionData(0, Optional.empty[Integer](), "metadata")).asJava).
- setMemberId("").setGenerationId(1)
-
+ new OffsetCommitRequest.Builder(
+ new OffsetCommitRequestData()
+ .setGroupId("test-group")
+ .setGenerationId(1)
+ .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ .setTopics(
+ Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(topic)
+ .setPartitions(
+ Collections.singletonList(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ .setCommittedOffset(0)
+ .setCommittedMetadata("metadata")
+ )
+ )
+ )
+ )
+ )
case ApiKeys.OFFSET_FETCH =>
new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
@@ -469,7 +485,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
case ApiKeys.METADATA =>
new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
- case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
+ case ApiKeys.OFFSET_COMMIT =>
+ new OffsetCommitResponse(response, ApiKeys.OFFSET_COMMIT.latestVersion).throttleTimeMs
case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
@@ -516,7 +533,10 @@ class RequestQuotaTest extends BaseRequestTest {
// Request until throttled using client-id with default small quota
val clientId = apiKey.toString
val client = Client(clientId, apiKey)
- val throttled = client.runUntil(response => responseThrottleTime(apiKey, response) > 0)
+
+ val throttled = client.runUntil(response =>
+ responseThrottleTime(apiKey, response) > 0
+ )
assertTrue(s"Response not throttled: $client", throttled)
assertTrue(s"Throttle time metrics not updated: $client" , throttleTimeMetricValue(clientId) > 0)