You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/03/19 14:27:38 UTC
[kafka] branch trunk updated: KAFKA-8618: Replace Txn marker with
automated protocol (#7039)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c7164a3 KAFKA-8618: Replace Txn marker with automated protocol (#7039)
c7164a3 is described below
commit c7164a3866ab6a28eb5ae158ff29ac806f4f3d54
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Mar 19 07:26:41 2020 -0700
KAFKA-8618: Replace Txn marker with automated protocol (#7039)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../org/apache/kafka/common/protocol/ApiKeys.java | 8 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../common/requests/WriteTxnMarkersRequest.java | 192 ++++++++-------------
.../common/requests/WriteTxnMarkersResponse.java | 164 +++++++-----------
.../requests/WriteTxnMarkersRequestTest.java | 81 +++++++++
.../requests/WriteTxnMarkersResponseTest.java | 60 +++++++
6 files changed, 277 insertions(+), 230 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6549d8f..e969170 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -95,6 +95,8 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.message.UpdateMetadataResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
@@ -120,8 +122,6 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
-import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
-import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -177,8 +177,8 @@ public enum ApiKeys {
ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),
AddOffsetsToTxnResponse.schemaVersions()),
END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequestData.SCHEMAS, EndTxnResponseData.SCHEMAS),
- WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequest.schemaVersions(),
- WriteTxnMarkersResponse.schemaVersions()),
+ WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequestData.SCHEMAS,
+ WriteTxnMarkersResponseData.SCHEMAS),
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequestData.SCHEMAS,
TxnOffsetCommitResponseData.SCHEMAS),
DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequestData.SCHEMAS, DescribeAclsResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 9d3f39d..301a4fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -135,7 +135,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
case END_TXN:
return new EndTxnResponse(struct, version);
case WRITE_TXN_MARKERS:
- return new WriteTxnMarkersResponse(struct);
+ return new WriteTxnMarkersResponse(struct, version);
case TXN_OFFSET_COMMIT:
return new TxnOffsetCommitResponse(struct, version);
case DESCRIBE_ACLS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 33f9bb5..c272633 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -17,13 +17,12 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData.WritableTxnMarker;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData.WritableTxnMarkerTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -32,40 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
public class WriteTxnMarkersRequest extends AbstractRequest {
- private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
- private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
-
- private static final String PRODUCER_ID_KEY_NAME = "producer_id";
- private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
- private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
- private static final String TOPICS_KEY_NAME = "topics";
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
- new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
- new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."),
- new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the transaction to write to the " +
- "partitions (false = ABORT, true = COMMIT)."),
- new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
- TOPIC_NAME,
- new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))), "The partitions to write markers for."),
- new Field(COORDINATOR_EPOCH_KEY_NAME, INT32, "Epoch associated with the transaction state partition " +
- "hosted by this transaction coordinator"));
-
- private static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
- new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The transaction markers to " +
- "be written."));
-
- public static Schema[] schemaVersions() {
- return new Schema[]{WRITE_TXN_MARKERS_REQUEST_V0};
- }
public static class TxnMarkerEntry {
private final long producerId;
@@ -106,16 +72,15 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
return partitions;
}
-
@Override
public String toString() {
return "TxnMarkerEntry{" +
- "producerId=" + producerId +
- ", producerEpoch=" + producerEpoch +
- ", coordinatorEpoch=" + coordinatorEpoch +
- ", result=" + result +
- ", partitions=" + partitions +
- '}';
+ "producerId=" + producerId +
+ ", producerEpoch=" + producerEpoch +
+ ", coordinatorEpoch=" + coordinatorEpoch +
+ ", result=" + result +
+ ", partitions=" + partitions +
+ '}';
}
@Override
@@ -124,10 +89,10 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
if (o == null || getClass() != o.getClass()) return false;
final TxnMarkerEntry that = (TxnMarkerEntry) o;
return producerId == that.producerId &&
- producerEpoch == that.producerEpoch &&
- coordinatorEpoch == that.coordinatorEpoch &&
- result == that.result &&
- Objects.equals(partitions, that.partitions);
+ producerEpoch == that.producerEpoch &&
+ coordinatorEpoch == that.coordinatorEpoch &&
+ result == that.result &&
+ Objects.equals(partitions, that.partitions);
}
@Override
@@ -137,106 +102,93 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
}
public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
- private final List<TxnMarkerEntry> markers;
- public Builder(List<TxnMarkerEntry> markers) {
+ public final WriteTxnMarkersRequestData data;
+
+ public Builder(final List<TxnMarkerEntry> markers) {
super(ApiKeys.WRITE_TXN_MARKERS);
- this.markers = markers;
+ List<WritableTxnMarker> dataMarkers = new ArrayList<>();
+ for (TxnMarkerEntry marker : markers) {
+ final Map<String, WritableTxnMarkerTopic> topicMap = new HashMap<>();
+ for (TopicPartition topicPartition : marker.partitions) {
+ WritableTxnMarkerTopic topic = topicMap.getOrDefault(topicPartition.topic(),
+ new WritableTxnMarkerTopic()
+ .setName(topicPartition.topic()));
+ topic.partitionIndexes().add(topicPartition.partition());
+ topicMap.put(topicPartition.topic(), topic);
+ }
+
+ dataMarkers.add(new WritableTxnMarker()
+ .setProducerId(marker.producerId)
+ .setProducerEpoch(marker.producerEpoch)
+ .setCoordinatorEpoch(marker.coordinatorEpoch)
+ .setTransactionResult(marker.transactionResult().id)
+ .setTopics(new ArrayList<>(topicMap.values())));
+ }
+ this.data = new WriteTxnMarkersRequestData().setMarkers(dataMarkers);
}
@Override
public WriteTxnMarkersRequest build(short version) {
- return new WriteTxnMarkersRequest(version, markers);
+ return new WriteTxnMarkersRequest(data, version);
}
}
- private final List<TxnMarkerEntry> markers;
+ public final WriteTxnMarkersRequestData data;
- private WriteTxnMarkersRequest(short version, List<TxnMarkerEntry> markers) {
+ private WriteTxnMarkersRequest(WriteTxnMarkersRequestData data, short version) {
super(ApiKeys.WRITE_TXN_MARKERS, version);
-
- this.markers = markers;
+ this.data = data;
}
public WriteTxnMarkersRequest(Struct struct, short version) {
super(ApiKeys.WRITE_TXN_MARKERS, version);
- List<TxnMarkerEntry> markers = new ArrayList<>();
- Object[] markersArray = struct.getArray(TXN_MARKERS_KEY_NAME);
- for (Object markerObj : markersArray) {
- Struct markerStruct = (Struct) markerObj;
-
- long producerId = markerStruct.getLong(PRODUCER_ID_KEY_NAME);
- short producerEpoch = markerStruct.getShort(PRODUCER_EPOCH_KEY_NAME);
- int coordinatorEpoch = markerStruct.getInt(COORDINATOR_EPOCH_KEY_NAME);
- TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
-
- List<TopicPartition> partitions = new ArrayList<>();
- Object[] topicPartitionsArray = markerStruct.getArray(TOPICS_KEY_NAME);
- for (Object topicPartitionObj : topicPartitionsArray) {
- Struct topicPartitionStruct = (Struct) topicPartitionObj;
- String topic = topicPartitionStruct.get(TOPIC_NAME);
- for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
- partitions.add(new TopicPartition(topic, (Integer) partitionObj));
- }
- }
-
- markers.add(new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, partitions));
- }
-
- this.markers = markers;
- }
-
-
- public List<TxnMarkerEntry> markers() {
- return markers;
+ this.data = new WriteTxnMarkersRequestData(struct, version);
}
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.requestSchema(version()));
-
- Object[] markersArray = new Object[markers.size()];
- int i = 0;
- for (TxnMarkerEntry entry : markers) {
- Struct markerStruct = struct.instance(TXN_MARKERS_KEY_NAME);
- markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId);
- markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch);
- markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
- markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
-
- Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupPartitionsByTopic(entry.partitions);
- Object[] partitionsArray = new Object[mappedPartitions.size()];
- int j = 0;
- for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
- Struct topicPartitionsStruct = markerStruct.instance(TOPICS_KEY_NAME);
- topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
- topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
- partitionsArray[j++] = topicPartitionsStruct;
- }
- markerStruct.set(TOPICS_KEY_NAME, partitionsArray);
- markersArray[i++] = markerStruct;
- }
- struct.set(TXN_MARKERS_KEY_NAME, markersArray);
-
- return struct;
+ return data.toStruct(version());
}
@Override
public WriteTxnMarkersResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
- Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());
- for (TxnMarkerEntry entry : markers) {
- Map<TopicPartition, Errors> errorsPerPartition = new HashMap<>(entry.partitions.size());
- for (TopicPartition partition : entry.partitions)
- errorsPerPartition.put(partition, error);
-
- errors.put(entry.producerId, errorsPerPartition);
+ final Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(data.markers().size());
+ for (WritableTxnMarker markerEntry : data.markers()) {
+ Map<TopicPartition, Errors> errorsPerPartition = new HashMap<>();
+ for (WritableTxnMarkerTopic topic : markerEntry.topics()) {
+ for (Integer partitionIdx : topic.partitionIndexes()) {
+ errorsPerPartition.put(new TopicPartition(topic.name(), partitionIdx), error);
+ }
+ }
+ errors.put(markerEntry.producerId(), errorsPerPartition);
}
return new WriteTxnMarkersResponse(errors);
}
+ public List<TxnMarkerEntry> markers() {
+ List<TxnMarkerEntry> markers = new ArrayList<>();
+ for (WritableTxnMarker markerEntry : data.markers()) {
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+ for (WritableTxnMarkerTopic topic : markerEntry.topics()) {
+ for (Integer partitionIdx : topic.partitionIndexes()) {
+ topicPartitions.add(new TopicPartition(topic.name(), partitionIdx));
+ }
+ }
+ markers.add(new TxnMarkerEntry(
+ markerEntry.producerId(),
+ markerEntry.producerEpoch(),
+ markerEntry.coordinatorEpoch(),
+ TransactionResult.forId(markerEntry.transactionResult()),
+ topicPartitions)
+ );
+ }
+ return markers;
+ }
+
public static WriteTxnMarkersRequest parse(ByteBuffer buffer, short version) {
return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
}
@@ -246,11 +198,11 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
- return Objects.equals(markers, that.markers);
+ return Objects.equals(this.data, that.data);
}
@Override
public int hashCode() {
- return Objects.hash(markers);
+ return Objects.hash(this.data);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 92b5fc0..3fde41a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -17,133 +17,87 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData.WritableTxnMarkerResult;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
+/**
+ * Possible error codes:
+ *
+ * - {@link Errors#CORRUPT_MESSAGE}
+ * - {@link Errors#INVALID_PRODUCER_EPOCH}
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
+ * - {@link Errors#NOT_LEADER_FOR_PARTITION}
+ * - {@link Errors#MESSAGE_TOO_LARGE}
+ * - {@link Errors#RECORD_LIST_TOO_LARGE}
+ * - {@link Errors#NOT_ENOUGH_REPLICAS}
+ * - {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND}
+ * - {@link Errors#INVALID_REQUIRED_ACKS}
+ * - {@link Errors#TRANSACTION_COORDINATOR_FENCED}
+ * - {@link Errors#REQUEST_TIMED_OUT}
+ * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ */
public class WriteTxnMarkersResponse extends AbstractResponse {
- private static final String TXN_MARKERS_KEY_NAME = "transaction_markers";
-
- private static final String PRODUCER_ID_KEY_NAME = "producer_id";
- private static final String TOPICS_KEY_NAME = "topics";
- private static final String PARTITIONS_KEY_NAME = "partitions";
-
- private static final Schema WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0 = new Schema(
- PARTITION_ID,
- ERROR_CODE);
-
- private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
- new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."),
- new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
- TOPIC_NAME,
- new Field(PARTITIONS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))),
- "Errors per partition from writing markers."));
-
- private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
- new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "Errors per partition from " +
- "writing markers."));
-
- public static Schema[] schemaVersions() {
- return new Schema[]{WRITE_TXN_MARKERS_RESPONSE_V0};
- }
-
- // Possible error codes:
- // CorruptRecord
- // InvalidProducerEpoch
- // UnknownTopicOrPartition
- // NotLeaderForPartition
- // MessageTooLarge
- // RecordListTooLarge
- // NotEnoughReplicas
- // NotEnoughReplicasAfterAppend
- // InvalidRequiredAcks
- // TransactionCoordinatorFenced
- // RequestTimeout
- // ClusterAuthorizationFailed
private final Map<Long, Map<TopicPartition, Errors>> errors;
+ public final WriteTxnMarkersResponseData data;
public WriteTxnMarkersResponse(Map<Long, Map<TopicPartition, Errors>> errors) {
- this.errors = errors;
- }
-
- public WriteTxnMarkersResponse(Struct struct) {
- Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>();
-
- Object[] responseArray = struct.getArray(TXN_MARKERS_KEY_NAME);
- for (Object responseObj : responseArray) {
- Struct responseStruct = (Struct) responseObj;
-
- long producerId = responseStruct.getLong(PRODUCER_ID_KEY_NAME);
-
- Map<TopicPartition, Errors> errorPerPartition = new HashMap<>();
- Object[] topicPartitionsArray = responseStruct.getArray(TOPICS_KEY_NAME);
- for (Object topicPartitionObj : topicPartitionsArray) {
- Struct topicPartitionStruct = (Struct) topicPartitionObj;
- String topic = topicPartitionStruct.get(TOPIC_NAME);
- for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionStruct = (Struct) partitionObj;
- Integer partition = partitionStruct.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
- errorPerPartition.put(new TopicPartition(topic, partition), error);
- }
+ List<WritableTxnMarkerResult> markers = new ArrayList<>();
+ for (Map.Entry<Long, Map<TopicPartition, Errors>> markerEntry : errors.entrySet()) {
+ Map<String, WritableTxnMarkerTopicResult> responseTopicDataMap = new HashMap<>();
+ for (Map.Entry<TopicPartition, Errors> topicEntry : markerEntry.getValue().entrySet()) {
+ TopicPartition topicPartition = topicEntry.getKey();
+ String topicName = topicPartition.topic();
+
+ WritableTxnMarkerTopicResult topic =
+ responseTopicDataMap.getOrDefault(topicName, new WritableTxnMarkerTopicResult().setName(topicName));
+ topic.partitions().add(new WritableTxnMarkerPartitionResult()
+ .setErrorCode(topicEntry.getValue().code())
+ .setPartitionIndex(topicPartition.partition())
+ );
+ responseTopicDataMap.put(topicName, topic);
}
- errors.put(producerId, errorPerPartition);
+
+ markers.add(new WritableTxnMarkerResult()
+ .setProducerId(markerEntry.getKey())
+ .setTopics(new ArrayList<>(responseTopicDataMap.values()))
+ );
}
this.errors = errors;
+ this.data = new WriteTxnMarkersResponseData()
+ .setMarkers(markers);
}
- @Override
- protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.responseSchema(version));
-
- Object[] responsesArray = new Object[errors.size()];
- int k = 0;
- for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) {
- Struct responseStruct = struct.instance(TXN_MARKERS_KEY_NAME);
- responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey());
-
- Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
- Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupPartitionDataByTopic(partitionAndErrors);
- Object[] partitionsArray = new Object[mappedPartitions.size()];
- int i = 0;
- for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
- Struct topicPartitionsStruct = responseStruct.instance(TOPICS_KEY_NAME);
- topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
- Map<Integer, Errors> partitionIdAndErrors = topicAndPartitions.getValue();
-
- Object[] partitionAndErrorsArray = new Object[partitionIdAndErrors.size()];
- int j = 0;
- for (Map.Entry<Integer, Errors> partitionAndError : partitionIdAndErrors.entrySet()) {
- Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
- partitionAndErrorStruct.set(PARTITION_ID, partitionAndError.getKey());
- partitionAndErrorStruct.set(ERROR_CODE, partitionAndError.getValue().code());
- partitionAndErrorsArray[j++] = partitionAndErrorStruct;
+ public WriteTxnMarkersResponse(Struct struct, short version) {
+ this.data = new WriteTxnMarkersResponseData(struct, version);
+ this.errors = new HashMap<>();
+ for (WritableTxnMarkerResult marker : data.markers()) {
+ Map<TopicPartition, Errors> topicPartitionErrorsMap = new HashMap<>();
+ for (WritableTxnMarkerTopicResult topic: marker.topics()) {
+ for (WritableTxnMarkerPartitionResult partitionResult: topic.partitions()) {
+ topicPartitionErrorsMap.put(new TopicPartition(topic.name(), partitionResult.partitionIndex()),
+ Errors.forCode(partitionResult.errorCode()));
}
- topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
- partitionsArray[i++] = topicPartitionsStruct;
}
- responseStruct.set(TOPICS_KEY_NAME, partitionsArray);
-
- responsesArray[k++] = responseStruct;
+ errors.put(marker.producerId(), topicPartitionErrorsMap);
}
+ }
- struct.set(TXN_MARKERS_KEY_NAME, responsesArray);
- return struct;
+ @Override
+ protected Struct toStruct(short version) {
+ return data.toStruct(version);
}
public Map<TopicPartition, Errors> errors(long producerId) {
@@ -161,6 +115,6 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
}
public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
- return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
+ return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer), version);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
new file mode 100644
index 0000000..7273e47
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class WriteTxnMarkersRequestTest {
+
+ private static long producerId = 10L;
+ private static short producerEpoch = 2;
+ private static int coordinatorEpoch = 1;
+ private static TransactionResult result = TransactionResult.COMMIT;
+ private static TopicPartition topicPartition = new TopicPartition("topic", 73);
+
+ protected static int throttleTimeMs = 10;
+
+ private static List<WriteTxnMarkersRequest.TxnMarkerEntry> markers;
+
+ @Before
+ public void setUp() {
+ markers = Collections.singletonList(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(
+ producerId, producerEpoch, coordinatorEpoch,
+ result, Collections.singletonList(topicPartition))
+ );
+ }
+
+ @Test
+ public void testConstructor() {
+ WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markers);
+ for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) {
+ WriteTxnMarkersRequest request = builder.build(version);
+ assertEquals(1, request.markers().size());
+ WriteTxnMarkersRequest.TxnMarkerEntry marker = request.markers().get(0);
+ assertEquals(producerId, marker.producerId());
+ assertEquals(producerEpoch, marker.producerEpoch());
+ assertEquals(coordinatorEpoch, marker.coordinatorEpoch());
+ assertEquals(result, marker.transactionResult());
+ assertEquals(Collections.singletonList(topicPartition), marker.partitions());
+ }
+ }
+
+ @Test
+ public void testGetErrorResponse() {
+ WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markers);
+ for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) {
+ WriteTxnMarkersRequest request = builder.build(version);
+ WriteTxnMarkersResponse errorResponse =
+ request.getErrorResponse(throttleTimeMs, Errors.UNKNOWN_PRODUCER_ID.exception());
+
+ assertEquals(Collections.singletonMap(
+ topicPartition, Errors.UNKNOWN_PRODUCER_ID), errorResponse.errors(producerId));
+ assertEquals(Collections.singletonMap(Errors.UNKNOWN_PRODUCER_ID, 1), errorResponse.errorCounts());
+ // Write txn marker has no throttle time defined in response.
+ assertEquals(0, errorResponse.throttleTimeMs());
+ }
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersResponseTest.java
new file mode 100644
index 0000000..4cec88c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersResponseTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class WriteTxnMarkersResponseTest {
+
+ private static long producerIdOne = 1L;
+ private static long producerIdTwo = 2L;
+
+ private static TopicPartition tp1 = new TopicPartition("topic", 1);
+ private static TopicPartition tp2 = new TopicPartition("topic", 2);
+
+ private static Errors pidOneError = Errors.UNKNOWN_PRODUCER_ID;
+ private static Errors pidTwoError = Errors.INVALID_PRODUCER_EPOCH;
+
+ private static Map<Long, Map<TopicPartition, Errors>> errorMap;
+
+ @Before
+ public void setUp() {
+ errorMap = new HashMap<>();
+ errorMap.put(producerIdOne, Collections.singletonMap(tp1, pidOneError));
+ errorMap.put(producerIdTwo, Collections.singletonMap(tp2, pidTwoError));
+ }
+ @Test
+ public void testConstructorWithStruct() {
+
+ Map<Errors, Integer> expectedErrorCounts = new HashMap<>();
+ expectedErrorCounts.put(Errors.UNKNOWN_PRODUCER_ID, 1);
+ expectedErrorCounts.put(Errors.INVALID_PRODUCER_EPOCH, 1);
+ WriteTxnMarkersResponse response = new WriteTxnMarkersResponse(errorMap);
+ assertEquals(expectedErrorCounts, response.errorCounts());
+ assertEquals(Collections.singletonMap(tp1, pidOneError), response.errors(producerIdOne));
+ assertEquals(Collections.singletonMap(tp2, pidTwoError), response.errors(producerIdTwo));
+ }
+}