You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/04/11 11:55:35 UTC
[kafka] branch trunk updated: KAFKA-8436: use automated protocol
for AddOffsetsToTxn (#7015)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 430e00e KAFKA-8436: use automated protocol for AddOffsetsToTxn (#7015)
430e00e is described below
commit 430e00ea95da959d6d8308dd49c4ed1cdffa7914
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sat Apr 11 04:54:53 2020 -0700
KAFKA-8436: use automated protocol for AddOffsetsToTxn (#7015)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../producer/internals/TransactionManager.java | 18 +++--
.../org/apache/kafka/common/protocol/ApiKeys.java | 8 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../common/requests/AddOffsetsToTxnRequest.java | 94 ++++------------------
.../common/requests/AddOffsetsToTxnResponse.java | 78 ++++++------------
.../kafka/clients/producer/KafkaProducerTest.java | 5 +-
.../producer/internals/TransactionManagerTest.java | 14 ++--
.../kafka/common/requests/RequestResponseTest.java | 14 +++-
core/src/main/scala/kafka/server/KafkaApis.scala | 24 ++++--
.../kafka/api/AuthorizerIntegrationTest.scala | 11 ++-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +-
11 files changed, 116 insertions(+), 160 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c432e50..fa473be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
@@ -395,9 +396,15 @@ public class TransactionManager {
"active transaction");
log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata);
- AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
- producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, groupMetadata.groupId());
+ AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(
+ new AddOffsetsToTxnRequestData()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerIdAndEpoch.producerId)
+ .setProducerEpoch(producerIdAndEpoch.epoch)
+ .setGroupId(groupMetadata.groupId())
+ );
AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets, groupMetadata);
+
enqueueRequest(handler);
return handler.result;
}
@@ -1610,13 +1617,14 @@ public class TransactionManager {
@Override
public void handleResponse(AbstractResponse response) {
AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
- Errors error = addOffsetsToTxnResponse.error();
+ Errors error = Errors.forCode(addOffsetsToTxnResponse.data.errorCode());
if (error == Errors.NONE) {
- log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId());
+ log.debug("Successfully added partition for consumer group {} to transaction", builder.data.groupId());
// note the result is not completed until the TxnOffsetCommit returns
pendingRequests.add(txnOffsetCommitHandler(result, offsets, groupMetadata));
+
transactionStarted = true;
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
@@ -1630,7 +1638,7 @@ public class TransactionManager {
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId()));
+ abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
} else {
fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 80d9383..411f337 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.AlterClientQuotasRequestData;
@@ -106,8 +108,6 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
-import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
@@ -174,8 +174,8 @@ public enum ApiKeys {
OffsetsForLeaderEpochResponse.schemaVersions()),
ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
AddPartitionsToTxnRequest.schemaVersions(), AddPartitionsToTxnResponse.schemaVersions()),
- ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),
- AddOffsetsToTxnResponse.schemaVersions()),
+ ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequestData.SCHEMAS,
+ AddOffsetsToTxnResponseData.SCHEMAS),
END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequestData.SCHEMAS, EndTxnResponseData.SCHEMAS),
WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequestData.SCHEMAS,
WriteTxnMarkersResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 7f2f4bc..dcd5439 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -131,7 +131,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
case ADD_PARTITIONS_TO_TXN:
return new AddPartitionsToTxnResponse(struct);
case ADD_OFFSETS_TO_TXN:
- return new AddOffsetsToTxnResponse(struct);
+ return new AddOffsetsToTxnResponse(struct, version);
case END_TXN:
return new EndTxnResponse(struct, version);
case WRITE_TXN_MARKERS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 2668ae1..3b1c746 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -16,124 +16,60 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-
public class AddOffsetsToTxnRequest extends AbstractRequest {
- private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
- TRANSACTIONAL_ID,
- PRODUCER_ID,
- PRODUCER_EPOCH,
- GROUP_ID);
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V1 = ADD_OFFSETS_TO_TXN_REQUEST_V0;
- public static Schema[] schemaVersions() {
- return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0, ADD_OFFSETS_TO_TXN_REQUEST_V1};
- }
+ public AddOffsetsToTxnRequestData data;
public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
- private final String transactionalId;
- private final long producerId;
- private final short producerEpoch;
- private final String consumerGroupId;
+ public AddOffsetsToTxnRequestData data;
- public Builder(String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+ public Builder(AddOffsetsToTxnRequestData data) {
super(ApiKeys.ADD_OFFSETS_TO_TXN);
- this.transactionalId = transactionalId;
- this.producerId = producerId;
- this.producerEpoch = producerEpoch;
- this.consumerGroupId = consumerGroupId;
- }
-
- public String consumerGroupId() {
- return consumerGroupId;
+ this.data = data;
}
@Override
public AddOffsetsToTxnRequest build(short version) {
- return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);
+ return new AddOffsetsToTxnRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type=AddOffsetsToTxnRequest").
- append(", transactionalId=").append(transactionalId).
- append(", producerId=").append(producerId).
- append(", producerEpoch=").append(producerEpoch).
- append(", consumerGroupId=").append(consumerGroupId).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
- private final String transactionalId;
- private final long producerId;
- private final short producerEpoch;
- private final String consumerGroupId;
-
- private AddOffsetsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+ public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) {
super(ApiKeys.ADD_OFFSETS_TO_TXN, version);
- this.transactionalId = transactionalId;
- this.producerId = producerId;
- this.producerEpoch = producerEpoch;
- this.consumerGroupId = consumerGroupId;
+ this.data = data;
}
public AddOffsetsToTxnRequest(Struct struct, short version) {
super(ApiKeys.ADD_OFFSETS_TO_TXN, version);
- this.transactionalId = struct.get(TRANSACTIONAL_ID);
- this.producerId = struct.get(PRODUCER_ID);
- this.producerEpoch = struct.get(PRODUCER_EPOCH);
- this.consumerGroupId = struct.get(GROUP_ID);
- }
-
- public String transactionalId() {
- return transactionalId;
- }
-
- public long producerId() {
- return producerId;
- }
-
- public short producerEpoch() {
- return producerEpoch;
- }
-
- public String consumerGroupId() {
- return consumerGroupId;
+ this.data = new AddOffsetsToTxnRequestData(struct, version);
}
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
- struct.set(TRANSACTIONAL_ID, transactionalId);
- struct.set(PRODUCER_ID, producerId);
- struct.set(PRODUCER_EPOCH, producerEpoch);
- struct.set(GROUP_ID, consumerGroupId);
- return struct;
+ return data.toStruct(version());
}
@Override
public AddOffsetsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- return new AddOffsetsToTxnResponse(throttleTimeMs, Errors.forException(e));
+ return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+ .setErrorCode(Errors.forException(e).code())
+ .setThrottleTimeMs(throttleTimeMs));
}
public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {
return new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN.parseRequest(version, buffer), version);
}
-
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 867ca6a..3747bb9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -16,86 +16,60 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-
+/**
+ * Possible error codes:
+ *
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
+ * - {@link Errors#INVALID_PRODUCER_EPOCH}
+ * - {@link Errors#INVALID_TXN_STATE}
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
+ */
public class AddOffsetsToTxnResponse extends AbstractResponse {
- private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
- THROTTLE_TIME_MS,
- ERROR_CODE);
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V1 = ADD_OFFSETS_TO_TXN_RESPONSE_V0;
- public static Schema[] schemaVersions() {
- return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0, ADD_OFFSETS_TO_TXN_RESPONSE_V1};
- }
-
- // Possible error codes:
- // NotCoordinator
- // CoordinatorNotAvailable
- // CoordinatorLoadInProgress
- // InvalidProducerIdMapping
- // InvalidProducerEpoch
- // InvalidTxnState
- // GroupAuthorizationFailed
- // TransactionalIdAuthorizationFailed
+ public AddOffsetsToTxnResponseData data;
- private final Errors error;
- private final int throttleTimeMs;
-
- public AddOffsetsToTxnResponse(int throttleTimeMs, Errors error) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
+ public AddOffsetsToTxnResponse(AddOffsetsToTxnResponseData data) {
+ this.data = data;
}
- public AddOffsetsToTxnResponse(Struct struct) {
- this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
- this.error = Errors.forCode(struct.get(ERROR_CODE));
+ public AddOffsetsToTxnResponse(Struct struct, short version) {
+ this.data = new AddOffsetsToTxnResponseData(struct, version);
}
@Override
- public int throttleTimeMs() {
- return throttleTimeMs;
- }
-
- public Errors error() {
- return error;
+ public Map<Errors, Integer> errorCounts() {
+ return errorCounts(Errors.forCode(data.errorCode()));
}
@Override
- public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
+ protected Struct toStruct(short version) {
+ return data.toStruct(version);
}
@Override
- protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
- struct.set(ERROR_CODE, error.code());
- return struct;
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
}
public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) {
- return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version, buffer));
+ return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version, buffer), version);
}
@Override
public String toString() {
- return "AddOffsetsToTxnResponse(" +
- "error=" + error +
- ", throttleTimeMs=" + throttleTimeMs +
- ')';
+ return data.toString();
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8fec21c..280faa1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -959,7 +960,9 @@ public class KafkaProducerTest {
}
private AddOffsetsToTxnResponse addOffsetsToTxnResponse(Errors error) {
- return new AddOffsetsToTxnResponse(10, error);
+ return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+ .setErrorCode(error.code())
+ .setThrottleTimeMs(10));
}
private TxnOffsetCommitResponse txnOffsetsCommitResponse(Map<TopicPartition, Errors> errorMap) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index c700605..5650110 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -3258,12 +3259,15 @@ public class TransactionManagerTest {
final short producerEpoch) {
client.prepareResponse(body -> {
AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) body;
- assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId());
- assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId());
- assertEquals(producerId, addOffsetsToTxnRequest.producerId());
- assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch());
+ assertEquals(consumerGroupId, addOffsetsToTxnRequest.data.groupId());
+ assertEquals(transactionalId, addOffsetsToTxnRequest.data.transactionalId());
+ assertEquals(producerId, addOffsetsToTxnRequest.data.producerId());
+ assertEquals(producerEpoch, addOffsetsToTxnRequest.data.producerEpoch());
return true;
- }, new AddOffsetsToTxnResponse(0, error));
+ }, new AddOffsetsToTxnResponse(
+ new AddOffsetsToTxnResponseData()
+ .setErrorCode(error.code()))
+ );
}
private void prepareTxnOffsetCommitResponse(final String consumerGroupId,
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7a6ce7f..0f9ab8c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@@ -1632,11 +1634,19 @@ public class RequestResponseTest {
}
private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {
- return new AddOffsetsToTxnRequest.Builder("tid", 21L, (short) 42, "gid").build();
+ return new AddOffsetsToTxnRequest.Builder(
+ new AddOffsetsToTxnRequestData()
+ .setTransactionalId("tid")
+ .setProducerId(21L)
+ .setProducerEpoch((short) 42)
+ .setGroupId("gid")
+ ).build();
}
private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
- return new AddOffsetsToTxnResponse(0, Errors.NONE);
+ return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setThrottleTimeMs(0));
}
private EndTxnRequest createEndTxnRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 43ab2c5..57ce87c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
-import org.apache.kafka.common.message.{AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducer [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, Heartb [...]
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
@@ -2154,20 +2154,28 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = {
ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest]
- val transactionalId = addOffsetsToTxnRequest.transactionalId
- val groupId = addOffsetsToTxnRequest.consumerGroupId
+ val transactionalId = addOffsetsToTxnRequest.data.transactionalId
+ val groupId = addOffsetsToTxnRequest.data.groupId
val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+ new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+ .setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code)
+ .setThrottleTimeMs(requestThrottleMs)))
else if (!authorize(request.context, READ, GROUP, groupId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
+ new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData()
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+ .setThrottleTimeMs(requestThrottleMs))
+ )
else {
def sendResponseCallback(error: Errors): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error)
+ val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(
+ new AddOffsetsToTxnResponseData()
+ .setErrorCode(error.code)
+ .setThrottleTimeMs(requestThrottleMs))
trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId on partition " +
s"$offsetTopicPartition: errors: $error from client ${request.header.clientId}")
responseBody
@@ -2176,8 +2184,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
- addOffsetsToTxnRequest.producerId,
- addOffsetsToTxnRequest.producerEpoch,
+ addOffsetsToTxnRequest.data.producerId,
+ addOffsetsToTxnRequest.data.producerEpoch,
Set(offsetTopicPartition),
sendResponseCallback)
}
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1b57cff..9a7ec14 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
@@ -184,7 +185,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errors(producerId).get(tp)),
ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)),
- ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error),
+ ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)),
ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => Errors.forCode(resp.results.asScala.head.errorCode)),
@@ -533,7 +534,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()
- private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
+ private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(
+ new AddOffsetsToTxnRequestData()
+ .setTransactionalId(transactionalId)
+ .setProducerId(1)
+ .setProducerEpoch(1)
+ .setGroupId(group)
+ ).build()
private def electLeadersRequest = new ElectLeadersRequest.Builder(
ElectionType.PREFERRED,
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 42220a8..c47d511 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,6 +24,7 @@ import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestData
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
@@ -395,7 +396,12 @@ class RequestQuotaTest extends BaseRequestTest {
new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava)
case ApiKeys.ADD_OFFSETS_TO_TXN =>
- new AddOffsetsToTxnRequest.Builder("test-transactional-id", 1, 0, "test-txn-group")
+ new AddOffsetsToTxnRequest.Builder(new AddOffsetsToTxnRequestData()
+ .setTransactionalId("test-transactional-id")
+ .setProducerId(1)
+ .setProducerEpoch(0)
+ .setGroupId("test-txn-group")
+ )
case ApiKeys.END_TXN =>
new EndTxnRequest.Builder(new EndTxnRequestData()