You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/11 15:42:54 UTC
[kafka] branch trunk updated: MINOR: Use generated InitProducerId
RPC (#6538)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53e95ff MINOR: Use generated InitProducerId RPC (#6538)
53e95ff is described below
commit 53e95ffcdb1cecbba67eb726aa2abcab3ae49c66
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 11 08:27:08 2019 -0700
MINOR: Use generated InitProducerId RPC (#6538)
This patch updates the InitProducerId request API to use the generated sources. It also fixes a small bug in the DescribeAclsRequest class where we were using the wrong api key.
Reviewers: Mickael Maison <mi...@gmail.com>, Colin McCabe <cm...@apache.org>
---
.../kafka/clients/producer/internals/Sender.java | 8 +-
.../producer/internals/TransactionManager.java | 10 ++-
.../org/apache/kafka/common/protocol/ApiKeys.java | 7 +-
.../kafka/common/requests/AbstractRequest.java | 2 +
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../kafka/common/requests/CreateTopicsRequest.java | 9 +-
.../kafka/common/requests/DescribeAclsRequest.java | 4 +-
.../common/requests/InitProducerIdRequest.java | 91 +++++++-------------
.../common/requests/InitProducerIdResponse.java | 97 +++++-----------------
.../common/message/InitProducerIdRequest.json | 2 +-
.../common/message/InitProducerIdResponse.json | 2 +-
.../clients/producer/internals/SenderTest.java | 25 ++++--
.../producer/internals/TransactionManagerTest.java | 26 +++---
.../kafka/common/requests/RequestResponseTest.java | 46 ++++++----
core/src/main/scala/kafka/server/KafkaApis.scala | 20 ++---
.../scala/unit/kafka/server/RequestQuotaTest.scala | 19 +++--
16 files changed, 160 insertions(+), 210 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6189aae..33bc496 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
@@ -480,7 +481,10 @@ public class Sender implements Runnable {
private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
String nodeId = node.idString();
- InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
+ InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+ .setTransactionalId(null)
+ .setTransactionTimeoutMs(Integer.MAX_VALUE);
+ InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(requestData);
ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null);
return NetworkClientUtils.sendAndReceive(client, request, time);
}
@@ -504,7 +508,7 @@ public class Sender implements Runnable {
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
- initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+ initProducerIdResponse.data.producerId(), initProducerIdResponse.data.producerEpoch());
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
return;
} else if (error.exception() instanceof RetriableException) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index b619093..b34cc98 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
@@ -254,8 +255,10 @@ public class TransactionManager {
return handleCachedTransactionRequestResult(() -> {
transitionTo(State.INITIALIZING);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
- InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
- InitProducerIdHandler handler = new InitProducerIdHandler(builder);
+ InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+ .setTransactionalId(transactionalId)
+ .setTransactionTimeoutMs(transactionTimeoutMs);
+ InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData));
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING);
@@ -1020,7 +1023,8 @@ public class TransactionManager {
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
- ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(),
+ initProducerIdResponse.data.producerEpoch());
setProducerIdAndEpoch(producerIdAndEpoch);
transitionTo(State.READY);
lastError = null;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index f49c99a..33d6736 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -81,8 +83,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.InitProducerIdRequest;
-import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.LeaderAndIsrResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
@@ -155,8 +155,7 @@ public enum ApiKeys {
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS),
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
- INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),
- InitProducerIdResponse.schemaVersions()),
+ INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequestData.SCHEMAS, InitProducerIdResponseData.SCHEMAS),
OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetsForLeaderEpochRequest.schemaVersions(),
OffsetsForLeaderEpochResponse.schemaVersions()),
ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 239024f..c069bc9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -76,11 +76,13 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
}
private final short version;
+ public final ApiKeys api;
public AbstractRequest(ApiKeys api, short version) {
if (!api.isVersionSupported(version))
throw new UnsupportedVersionException("The " + api + " protocol does not support version " + version);
this.version = version;
+ this.api = api;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index f594f20..50ae0b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -115,7 +115,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case DELETE_RECORDS:
return new DeleteRecordsResponse(struct);
case INIT_PRODUCER_ID:
- return new InitProducerIdResponse(struct);
+ return new InitProducerIdResponse(struct, version);
case OFFSET_FOR_LEADER_EPOCH:
return new OffsetsForLeaderEpochResponse(struct);
case ADD_PARTITIONS_TO_TXN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 93f7ab2..a2cd17d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -17,12 +17,12 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@@ -50,7 +50,6 @@ public class CreateTopicsRequest extends AbstractRequest {
}
private final CreateTopicsRequestData data;
- private final short version;
public static final int NO_NUM_PARTITIONS = -1;
public static final short NO_REPLICATION_FACTOR = -1;
@@ -58,13 +57,11 @@ public class CreateTopicsRequest extends AbstractRequest {
private CreateTopicsRequest(CreateTopicsRequestData data, short version) {
super(ApiKeys.CREATE_TOPICS, version);
this.data = data;
- this.version = version;
}
public CreateTopicsRequest(Struct struct, short version) {
super(ApiKeys.CREATE_TOPICS, version);
this.data = new CreateTopicsRequestData(struct, version);
- this.version = version;
}
public CreateTopicsRequestData data() {
@@ -96,6 +93,6 @@ public class CreateTopicsRequest extends AbstractRequest {
*/
@Override
public Struct toStruct() {
- return data.toStruct(version);
+ return data.toStruct(version());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 04bfee8..ed417c5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -86,14 +86,14 @@ public class DescribeAclsRequest extends AbstractRequest {
private final AclBindingFilter filter;
DescribeAclsRequest(AclBindingFilter filter, short version) {
- super(ApiKeys.DELETE_ACLS, version);
+ super(ApiKeys.DESCRIBE_ACLS, version);
this.filter = filter;
validate(filter, version);
}
public DescribeAclsRequest(Struct struct, short version) {
- super(ApiKeys.DELETE_ACLS, version);
+ super(ApiKeys.DESCRIBE_ACLS, version);
ResourcePatternFilter resourceFilter = RequestUtils.resourcePatternFilterFromStructFields(struct);
AccessControlEntryFilter entryFilter = RequestUtils.aceFilterFromStructFields(struct);
this.filter = new AclBindingFilter(resourceFilter, entryFilter);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
index aab7c72..8351c21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -16,106 +16,71 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
-import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
public class InitProducerIdRequest extends AbstractRequest {
- public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
-
- private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
-
- private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
- NULLABLE_TRANSACTIONAL_ID,
- new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema INIT_PRODUCER_ID_REQUEST_V1 = INIT_PRODUCER_ID_REQUEST_V0;
-
- public static Schema[] schemaVersions() {
- return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0, INIT_PRODUCER_ID_REQUEST_V1};
- }
-
- private final String transactionalId;
- private final int transactionTimeoutMs;
-
public static class Builder extends AbstractRequest.Builder<InitProducerIdRequest> {
- private final String transactionalId;
- private final int transactionTimeoutMs;
-
- public Builder(String transactionalId) {
- this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
- }
+ private final InitProducerIdRequestData data;
- public Builder(String transactionalId, int transactionTimeoutMs) {
+ public Builder(InitProducerIdRequestData data) {
super(ApiKeys.INIT_PRODUCER_ID);
-
- if (transactionTimeoutMs <= 0)
- throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
-
- if (transactionalId != null && transactionalId.isEmpty())
- throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
-
- this.transactionalId = transactionalId;
- this.transactionTimeoutMs = transactionTimeoutMs;
+ this.data = data;
}
@Override
public InitProducerIdRequest build(short version) {
- return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs);
+ if (data.transactionTimeoutMs() <= 0)
+ throw new IllegalArgumentException("transaction timeout value is not positive: " + data.transactionTimeoutMs());
+
+ if (data.transactionalId() != null && data.transactionalId().isEmpty())
+ throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+
+ return new InitProducerIdRequest(data, version);
}
@Override
public String toString() {
- return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
- transactionTimeoutMs + ")";
+ return data.toString();
}
}
- public InitProducerIdRequest(Struct struct, short version) {
+ public final InitProducerIdRequestData data;
+
+ private InitProducerIdRequest(InitProducerIdRequestData data, short version) {
super(ApiKeys.INIT_PRODUCER_ID, version);
- this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID);
- this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
+ this.data = data;
}
- private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) {
+ public InitProducerIdRequest(Struct struct, short version) {
super(ApiKeys.INIT_PRODUCER_ID, version);
- this.transactionalId = transactionalId;
- this.transactionTimeoutMs = transactionTimeoutMs;
+ this.data = new InitProducerIdRequestData(struct, version);
}
+
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e));
+ InitProducerIdResponseData response = new InitProducerIdResponseData()
+ .setErrorCode(Errors.forException(e).code())
+ .setProducerId(RecordBatch.NO_PRODUCER_ID)
+ .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
+ .setThrottleTimeMs(0);
+ return new InitProducerIdResponse(response);
}
public static InitProducerIdRequest parse(ByteBuffer buffer, short version) {
return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
}
- public String transactionalId() {
- return transactionalId;
- }
-
- public int transactionTimeoutMs() {
- return transactionTimeoutMs;
- }
-
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
- struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId);
- struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
- return struct;
+ return data.toStruct(version());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 9a1e0f7..a33daf3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -16,110 +16,59 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-
+/**
+ * Possible error codes:
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
+ * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ */
public class InitProducerIdResponse extends AbstractResponse {
- // Possible error codes:
- // NotCoordinator
- // CoordinatorNotAvailable
- // CoordinatorLoadInProgress
- // TransactionalIdAuthorizationFailed
- // ClusterAuthorizationFailed
-
- private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
- THROTTLE_TIME_MS,
- ERROR_CODE,
- PRODUCER_ID,
- PRODUCER_EPOCH);
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema INIT_PRODUCER_ID_RESPONSE_V1 = INIT_PRODUCER_ID_RESPONSE_V0;
-
- public static Schema[] schemaVersions() {
- return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0, INIT_PRODUCER_ID_RESPONSE_V1};
- }
-
- private final int throttleTimeMs;
- private final Errors error;
- private final long producerId;
- private final short epoch;
+ public final InitProducerIdResponseData data;
- public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
- this.producerId = producerId;
- this.epoch = epoch;
+ public InitProducerIdResponse(InitProducerIdResponseData data) {
+ this.data = data;
}
- public InitProducerIdResponse(Struct struct) {
- this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
- this.error = Errors.forCode(struct.get(ERROR_CODE));
- this.producerId = struct.get(PRODUCER_ID);
- this.epoch = struct.get(PRODUCER_EPOCH);
- }
-
- public InitProducerIdResponse(int throttleTimeMs, Errors errors) {
- this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+ public InitProducerIdResponse(Struct struct, short version) {
+ this.data = new InitProducerIdResponseData(struct, version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
- }
-
- public long producerId() {
- return producerId;
- }
-
- public Errors error() {
- return error;
+ return data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
- }
-
- public short epoch() {
- return epoch;
+ return errorCounts(Errors.forCode(data.errorCode()));
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
- struct.set(PRODUCER_ID, producerId);
- struct.set(PRODUCER_EPOCH, epoch);
- struct.set(ERROR_CODE, error.code());
- return struct;
+ return data.toStruct(version);
}
public static InitProducerIdResponse parse(ByteBuffer buffer, short version) {
- return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
+ return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer), version);
}
@Override
public String toString() {
- return "InitProducerIdResponse(" +
- "error=" + error +
- ", producerId=" + producerId +
- ", producerEpoch=" + epoch +
- ", throttleTimeMs=" + throttleTimeMs +
- ')';
+ return data.toString();
+ }
+
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
}
@Override
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index 8bf2ce3..c8ca110 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -23,6 +23,6 @@
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The transactional id, or null if the producer is not transactional." },
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
- "about": "The time in ms to wait for before aborting idle transactions sent by this producer." }
+ "about": "The time in ms to wait for before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." }
]
}
diff --git a/clients/src/main/resources/common/message/InitProducerIdResponse.json b/clients/src/main/resources/common/message/InitProducerIdResponse.json
index b251051..a52fc81 100644
--- a/clients/src/main/resources/common/message/InitProducerIdResponse.json
+++ b/clients/src/main/resources/common/message/InitProducerIdResponse.json
@@ -25,7 +25,7 @@
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ProducerId", "type": "int64", "versions": "0+",
- "about": "The current producer id." },
+ "default": -1, "about": "The current producer id." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "The current epoch associated with the producer id." }
]
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 4cbdaa2..d397fd4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -108,7 +109,6 @@ import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.spy;
public class SenderTest {
-
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
private static final short ACKS_ALL = -1;
private static final String CLIENT_ID = "clientId";
@@ -2321,15 +2321,22 @@ public class SenderTest {
if (error != Errors.NONE)
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(AbstractRequest body) {
- return body instanceof InitProducerIdRequest && ((InitProducerIdRequest) body).transactionalId() == null;
- }
- }, new InitProducerIdResponse(0, error, producerId, producerEpoch));
+ client.prepareResponse(body -> {
+ return body instanceof InitProducerIdRequest &&
+ ((InitProducerIdRequest) body).data.transactionalId() == null;
+ }, initProducerIdResponse(producerId, producerEpoch, error));
sender.run(time.milliseconds());
}
+ private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
+ InitProducerIdResponseData responseData = new InitProducerIdResponseData()
+ .setErrorCode(error.code())
+ .setProducerEpoch(producerEpoch)
+ .setProducerId(producerId)
+ .setThrottleTimeMs(0);
+ return new InitProducerIdResponse(responseData);
+ }
+
private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE);
@@ -2345,8 +2352,8 @@ public class SenderTest {
client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
}
- private void prepareInitPidResponse(Errors error, long pid, short epoch) {
- client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
+ private void prepareInitPidResponse(Errors error, long producerId, short producerEpoch) {
+ client.prepareResponse(initProducerIdResponse(producerId, producerEpoch, error));
}
private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType)
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 97f7f5d..1c47b9d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -703,8 +704,8 @@ public class TransactionManagerTest {
client.prepareUnsupportedVersionResponse(body -> {
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
- assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
- assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
+ assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
+ assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
return true;
});
@@ -2381,21 +2382,26 @@ public class TransactionManagerTest {
}, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect);
}
- private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long pid, short epoch) {
+ private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
+ InitProducerIdResponseData responseData = new InitProducerIdResponseData()
+ .setErrorCode(error.code())
+ .setProducerEpoch(producerEpoch)
+ .setProducerId(producerId)
+ .setThrottleTimeMs(0);
client.prepareResponse(body -> {
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
- assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
- assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
+ assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
+ assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
return true;
- }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
+ }, new InitProducerIdResponse(responseData), shouldDisconnect);
}
- private void sendProduceResponse(Errors error, final long pid, final short epoch) {
- client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+ private void sendProduceResponse(Errors error, final long producerId, final short producerEpoch) {
+ client.respond(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
}
- private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
- client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+ private void prepareProduceResponse(Errors error, final long producerId, final short producerEpoch) {
+ client.prepareResponse(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
}
private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
return body -> {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index dfdc323..ca695c7 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -52,6 +52,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -389,30 +391,39 @@ public class RequestResponseTest {
checkResponse(req.getErrorResponse(e), req.version());
}
- private void checkRequest(AbstractRequest req) throws Exception {
+ private void checkRequest(AbstractRequest req) {
// Check that we can serialize, deserialize and serialize again
// We don't check for equality or hashCode because it is likely to fail for any request containing a HashMap
checkRequest(req, false);
}
- private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) throws Exception {
+ private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) {
// Check that we can serialize, deserialize and serialize again
// Check for equality and hashCode only if indicated
- Struct struct = req.toStruct();
- AbstractRequest deserialized = (AbstractRequest) deserialize(req, struct, req.version());
- Struct struct2 = deserialized.toStruct();
- if (checkEqualityAndHashCode) {
- assertEquals(struct, struct2);
- assertEquals(struct.hashCode(), struct2.hashCode());
+ try {
+ Struct struct = req.toStruct();
+ AbstractRequest deserialized = AbstractRequest.parseRequest(req.api, req.version(), struct);
+ Struct struct2 = deserialized.toStruct();
+ if (checkEqualityAndHashCode) {
+ assertEquals(struct, struct2);
+ assertEquals(struct.hashCode(), struct2.hashCode());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to deserialize request " + req + " with type " + req.getClass(), e);
}
}
private void checkResponse(AbstractResponse response, int version) throws Exception {
// Check that we can serialize, deserialize and serialize again
// We don't check for equality or hashCode because it is likely to fail for any response containing a HashMap
- Struct struct = response.toStruct((short) version);
- AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version);
- Struct struct2 = deserialized.toStruct((short) version);
+ try {
+ Struct struct = response.toStruct((short) version);
+ AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version);
+ Struct struct2 = deserialized.toStruct((short) version);
+ assertEquals(struct2, struct);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to deserialize response " + response + " with type " + response.getClass(), e);
+ }
}
private AbstractRequestResponse deserialize(AbstractRequestResponse req, Struct struct, short version) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
@@ -1167,14 +1178,21 @@ public class RequestResponseTest {
}
private InitProducerIdRequest createInitPidRequest() {
- return new InitProducerIdRequest.Builder(null, 100).build();
+ InitProducerIdRequestData requestData = new InitProducerIdRequestData()
+ .setTransactionalId(null)
+ .setTransactionTimeoutMs(100);
+ return new InitProducerIdRequest.Builder(requestData).build();
}
private InitProducerIdResponse createInitPidResponse() {
- return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3);
+ InitProducerIdResponseData responseData = new InitProducerIdResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setProducerEpoch((short) 3)
+ .setProducerId(3332)
+ .setThrottleTimeMs(0);
+ return new InitProducerIdResponse(responseData);
}
-
private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7d12fe3..140fdd4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,16 +45,9 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
-import org.apache.kafka.common.message.DeleteTopicsResponseData
+import org.apache.kafka.common.message.{CreateTopicsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData, ElectPreferredLeadersResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData}
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet}
-import org.apache.kafka.common.message.DescribeGroupsResponseData
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
-import org.apache.kafka.common.message.JoinGroupResponseData
-import org.apache.kafka.common.message.LeaveGroupResponseData
-import org.apache.kafka.common.message.SaslAuthenticateResponseData
-import org.apache.kafka.common.message.SaslHandshakeResponseData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -1680,7 +1673,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
val initProducerIdRequest = request.body[InitProducerIdRequest]
- val transactionalId = initProducerIdRequest.transactionalId
+ val transactionalId = initProducerIdRequest.data.transactionalId
if (transactionalId != null) {
if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
@@ -1694,13 +1687,18 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(result: InitProducerIdResult): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
+ val responseData = new InitProducerIdResponseData()
+ .setProducerId(result.producerId)
+ .setProducerEpoch(result.producerEpoch)
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(result.error.code)
+ val responseBody = new InitProducerIdResponse(responseData)
trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
- txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
+ txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs, sendResponseCallback)
}
def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d04f39f..05f4bcd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,13 +24,11 @@ import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, InitProducerIdRequestData, JoinGroupRequestData, LeaveGroupRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
-import org.apache.kafka.common.message.SaslAuthenticateRequestData
-import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
@@ -321,7 +319,10 @@ class RequestQuotaTest extends BaseRequestTest {
new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
case ApiKeys.INIT_PRODUCER_ID =>
- new InitProducerIdRequest.Builder("abc")
+ val requestData = new InitProducerIdRequestData()
+ .setTransactionalId("test-transactional-id")
+ .setTransactionTimeoutMs(5000)
+ new InitProducerIdRequest.Builder(requestData)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
@@ -463,7 +464,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.FETCH => FetchResponse.parse(response).throttleTimeMs
case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
case ApiKeys.METADATA =>
- new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
+ new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
@@ -472,15 +473,15 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_GROUPS =>
- new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
+ new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_TOPICS =>
- new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs
+ new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion).throttleTimeMs
case ApiKeys.DELETE_TOPICS =>
- new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion()).throttleTimeMs
+ new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion).throttleTimeMs
case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
- case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs
+ case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response, ApiKeys.INIT_PRODUCER_ID.latestVersion).throttleTimeMs
case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs