You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/07 11:08:46 UTC
[2/2] kafka git commit: KAFKA-4990;
Request/response classes for transactions (KIP-98)
KAFKA-4990; Request/response classes for transactions (KIP-98)
Author: Matthias J. Sax <ma...@confluent.io>
Author: Guozhang Wang <wa...@gmail.com>
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2799 from mjsax/kafka-4990-add-api-stub-config-parameters-request-types
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/865d82af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/865d82af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/865d82af
Branch: refs/heads/trunk
Commit: 865d82af2cc050d10544d70b95468da90c1d800b
Parents: 2f4f3b9
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 7 11:22:09 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Apr 7 12:07:25 2017 +0100
----------------------------------------------------------------------
checkstyle/suppressions.xml | 4 +
.../apache/kafka/clients/producer/Producer.java | 12 +-
.../errors/InvalidPidMappingException.java | 23 +++
.../common/errors/InvalidTxnStateException.java | 23 +++
.../apache/kafka/common/protocol/ApiKeys.java | 7 +-
.../apache/kafka/common/protocol/Errors.java | 36 ++--
.../apache/kafka/common/protocol/Protocol.java | 185 +++++++++++++++++-
.../kafka/common/record/ControlRecordType.java | 8 +-
.../kafka/common/requests/AbstractRequest.java | 15 ++
.../kafka/common/requests/AbstractResponse.java | 10 +
.../common/requests/AddOffsetsToTxnRequest.java | 107 ++++++++++
.../requests/AddOffsetsToTxnResponse.java | 60 ++++++
.../requests/AddPartitionsToTxnRequest.java | 136 +++++++++++++
.../requests/AddPartitionsToTxnResponse.java | 60 ++++++
.../kafka/common/requests/EndTxnRequest.java | 107 ++++++++++
.../kafka/common/requests/EndTxnResponse.java | 59 ++++++
.../kafka/common/requests/FetchResponse.java | 20 +-
.../kafka/common/requests/InitPidResponse.java | 4 +-
.../common/requests/TransactionResult.java | 34 ++++
.../common/requests/TxnOffsetCommitRequest.java | 195 +++++++++++++++++++
.../requests/TxnOffsetCommitResponse.java | 102 ++++++++++
.../common/requests/WriteTxnMarkersRequest.java | 186 ++++++++++++++++++
.../requests/WriteTxnMarkersResponse.java | 130 +++++++++++++
.../common/requests/RequestResponseTest.java | 89 ++++++++-
.../src/main/scala/kafka/server/KafkaApis.scala | 32 ++-
25 files changed, 1597 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 39d61e2..607ba69 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -13,6 +13,10 @@
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/common/utils/Utils.java"/>
+ <suppress checks="ClassFanOutComplexity"
+ files=".*/requests/AbstractRequest.java"/>
+ <suppress checks="ClassFanOutComplexity"
+ files=".*/requests/AbstractResponse.java"/>
<suppress checks="MethodLength"
files="KerberosLogin.java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index a77ecd0..4da8681 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -16,16 +16,16 @@
*/
package org.apache.kafka.clients.producer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.MetricName;
-
/**
* The interface for the {@link KafkaProducer}
@@ -36,7 +36,7 @@ public interface Producer<K, V> extends Closeable {
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
- *
+ *
* @param record The record to send
* @return A future which will eventually contain the response information
*/
@@ -46,7 +46,7 @@ public interface Producer<K, V> extends Closeable {
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
-
+
/**
* Flush any accumulated records from the producer. Blocks until all sends are complete.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java
new file mode 100644
index 0000000..69fb71e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidPidMappingException extends ApiException {
+ public InvalidPidMappingException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java
new file mode 100644
index 0000000..ff06904
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidTxnStateException extends ApiException {
+ public InvalidTxnStateException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b65defb..63bcfec 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -48,7 +48,12 @@ public enum ApiKeys {
DELETE_TOPICS(20, "DeleteTopics"),
DELETE_RECORDS(21, "DeleteRecords"),
INIT_PRODUCER_ID(22, "InitProducerId"),
- OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch");
+ OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch"),
+ ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn"),
+ ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn"),
+ END_TXN(26, "EndTxn"),
+ WRITE_TXN_MARKERS(27, "WriteTxnMarkers"),
+ TXN_OFFSET_COMMIT(28, "TxnOffsetCommit");
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 519e52c..ccebd93 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.common.protocol;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -36,17 +33,16 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
-import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.ProducerFencedException;
-import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
@@ -55,23 +51,29 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
@@ -169,10 +171,18 @@ public enum Errors {
" the message was sent to an incompatible broker. See the broker logs for more details.")),
UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
- POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
- OUT_OF_ORDER_SEQUENCE_NUMBER(45, new OutOfOrderSequenceException("The broker received an out of order sequence number")),
- DUPLICATE_SEQUENCE_NUMBER(46, new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
- PRODUCER_FENCED(47, new ProducerFencedException("Producer attempted an operation with an old epoch"));
+ POLICY_VIOLATION(44,
+ new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
+ OUT_OF_ORDER_SEQUENCE_NUMBER(45,
+ new OutOfOrderSequenceException("The broker received an out of order sequence number")),
+ DUPLICATE_SEQUENCE_NUMBER(46,
+ new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
+ INVALID_PRODUCER_EPOCH(47,
+ new ProducerFencedException("Producer attempted an operation with an old epoch")),
+ INVALID_TXN_STATE(48,
+ new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
+ INVALID_PID_MAPPING(49,
+ new InvalidPidMappingException("The PID mapping is invalid"));
private static final Logger log = LoggerFactory.getLogger(Errors.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index cc228c5..4c58bb8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -647,7 +647,7 @@ public class Protocol {
// The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
// last stable offset). It also exposes messages with magic v2 (along with older formats).
private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
- new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
+ new Field("producer_id", INT64, "The producer id associated with the aborted transactions"),
new Field("first_offset", INT64, "The first offset in the aborted transaction"));
public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
@@ -1180,19 +1180,19 @@ public class Protocol {
public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
new Field("transactional_id",
NULLABLE_STRING,
- "The transactional id whose pid we want to retrieve or generate.")
+ "The transactional id whose producer id we want to retrieve or generate.")
);
public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
new Field("error_code",
INT16,
"An integer error code."),
- new Field("pid",
+ new Field("producer_id",
INT64,
- "The pid for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages"),
- new Field("epoch",
+ "The producer id for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages."),
+ new Field("producer_epoch",
INT16,
- "The epoch for the pid. Will always be 0 if no transactional id was specified in the request.")
+ "The epoch for the producer id. Will always be 0 if no transactional id was specified in the request.")
);
public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0};
@@ -1249,6 +1249,169 @@ public class Protocol {
public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
+ public static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
+ new Field("transactional_id",
+ STRING,
+ "The transactional id corresponding to the transaction."),
+ new Field("producer_id",
+ INT64,
+ "Current producer id in use by the transactional id."),
+ new Field("producer_epoch",
+ INT16,
+ "Current epoch associated with the producer id."),
+ new Field("topics",
+ new ArrayOf(new Schema(
+ new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(INT32)))),
+ "The partitions to add to the transaction.")
+ );
+ public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
+ new Field("error_code",
+ INT16,
+ "An integer error code.")
+ );
+
+ public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0};
+ public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = new Schema[] {ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
+
+ public static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
+ new Field("transactional_id",
+ STRING,
+ "The transactional id corresponding to the transaction."),
+ new Field("producer_id",
+ INT64,
+ "Current producer id in use by the transactional id."),
+ new Field("producer_epoch",
+ INT16,
+ "Current epoch associated with the producer id."),
+ new Field("consumer_group_id",
+ STRING,
+ "Consumer group id whose offsets should be included in the transaction.")
+ );
+ public static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
+ new Field("error_code",
+ INT16,
+ "An integer error code.")
+ );
+
+ public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = new Schema[] {ADD_OFFSETS_TO_TXN_REQUEST_V0};
+ public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = new Schema[] {ADD_OFFSETS_TO_TXN_RESPONSE_V0};
+
+ public static final Schema END_TXN_REQUEST_V0 = new Schema(
+ new Field("transactional_id",
+ STRING,
+ "The transactional id corresponding to the transaction."),
+ new Field("producer_id",
+ INT64,
+ "Current producer id in use by the transactional id."),
+ new Field("producer_epoch",
+ INT16,
+ "Current epoch associated with the producer id."),
+ new Field("transaction_result",
+ BOOLEAN,
+ "The result of the transaction (0 = ABORT, 1 = COMMIT)")
+ );
+
+ public static final Schema END_TXN_RESPONSE_V0 = new Schema(
+ new Field("error_code",
+ INT16,
+ "An integer error code.")
+ );
+
+ public static final Schema[] END_TXN_REQUEST = new Schema[] {END_TXN_REQUEST_V0};
+ public static final Schema[] END_TXN_RESPONSE = new Schema[] {END_TXN_RESPONSE_V0};
+
+ public static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
+ new Field("producer_id",
+ INT64,
+ "Current producer id in use by the transactional id."),
+ new Field("producer_epoch",
+ INT16,
+ "Current epoch associated with the producer id."),
+ new Field("transaction_result",
+ BOOLEAN,
+ "The result of the transaction to write to the partitions (false = ABORT, true = COMMIT)."),
+ new Field("topics",
+ new ArrayOf(new Schema(
+ new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(INT32)))),
+ "The partitions to write markers for.")
+ );
+
+ public static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
+ new Field("coordinator_epoch",
+ INT32,
+ "Epoch associated with the transaction state partition hosted by this transaction coordinator."),
+ new Field("transaction_markers",
+ new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0),
+ "The transaction markers to be written.")
+ );
+
+ public static final Schema WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0 = new Schema(
+ new Field("partition", INT32),
+ new Field("error_code", INT16)
+ );
+
+ public static final Schema WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0 = new Schema(
+ new Field("producer_id",
+ INT64,
+ "Current producer id in use by the transactional id."),
+ new Field("topics",
+ new ArrayOf(new Schema(
+ new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))),
+ "Errors per partition from writing markers.")
+ );
+
+ public static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
+ new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers.")
+ );
+
+ public static final Schema[] WRITE_TXN_REQUEST = new Schema[] {WRITE_TXN_MARKERS_REQUEST_V0};
+ public static final Schema[] WRITE_TXN_RESPONSE = new Schema[] {WRITE_TXN_MARKERS_RESPONSE_V0};
+
+ public static final Schema TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0 = new Schema(
+ new Field("partition", INT32),
+ new Field("offset", INT64),
+ new Field("metadata", NULLABLE_STRING)
+ );
+
+ public static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
+ new Field("consumer_group_id",
+ STRING,
+ "Id of the associated consumer group to commit offsets for."),
+ new Field("producer_id",
+ INT64,
+ "Current producer id in use by the transactional id."),
+ new Field("producer_epoch",
+ INT16,
+ "Current epoch associated with the producer id."),
+ new Field("retention_time",
+ INT64,
+ "The time in ms to retain the offset."),
+ new Field("topics",
+ new ArrayOf(new Schema(
+ new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0)))),
+ "The partitions to write markers for.")
+ );
+
+ public static final Schema TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0 = new Schema(
+ new Field("partition", INT32),
+ new Field("error_code", INT16)
+ );
+
+ public static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
+ new Field("topics",
+ new ArrayOf(new Schema(
+ new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0)))),
+ "Errors per partition from writing markers.")
+ );
+
+ public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] {TXN_OFFSET_COMMIT_REQUEST_V0};
+ public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] {TXN_OFFSET_COMMIT_RESPONSE_V0};
+
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1283,6 +1446,11 @@ public class Protocol {
REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
REQUESTS[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_REQUEST;
REQUESTS[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_REQUEST;
+ REQUESTS[ApiKeys.ADD_PARTITIONS_TO_TXN.id] = ADD_PARTITIONS_TO_TXN_REQUEST;
+ REQUESTS[ApiKeys.ADD_OFFSETS_TO_TXN.id] = ADD_OFFSETS_TO_TXN_REQUEST;
+ REQUESTS[ApiKeys.END_TXN.id] = END_TXN_REQUEST;
+ REQUESTS[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_REQUEST;
+ REQUESTS[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1308,6 +1476,11 @@ public class Protocol {
RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
RESPONSES[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_RESPONSE;
RESPONSES[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_RESPONSE;
+ RESPONSES[ApiKeys.ADD_PARTITIONS_TO_TXN.id] = ADD_PARTITIONS_TO_TXN_RESPONSE;
+ RESPONSES[ApiKeys.ADD_OFFSETS_TO_TXN.id] = ADD_OFFSETS_TO_TXN_RESPONSE;
+ RESPONSES[ApiKeys.END_TXN.id] = END_TXN_RESPONSE;
+ RESPONSES[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_RESPONSE;
+ RESPONSES[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_RESPONSE;
/* set the minimum and maximum version of each api */
for (ApiKeys api : ApiKeys.values()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
index 6bd614a..723af66 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -40,8 +40,8 @@ import java.nio.ByteBuffer;
* The schema for the value field is left to the control record type to specify.
*/
public enum ControlRecordType {
- COMMIT((short) 0),
- ABORT((short) 1),
+ ABORT((short) 0),
+ COMMIT((short) 1),
// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
UNKNOWN((short) -1);
@@ -77,9 +77,9 @@ public enum ControlRecordType {
short type = key.getShort(2);
switch (type) {
case 0:
- return COMMIT;
- case 1:
return ABORT;
+ case 1:
+ return COMMIT;
default:
return UNKNOWN;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 7ce3518..bd4bc49 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -177,6 +177,21 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
case OFFSET_FOR_LEADER_EPOCH:
request = new OffsetsForLeaderEpochRequest(struct, version);
break;
+ case ADD_PARTITIONS_TO_TXN:
+ request = new AddPartitionsToTxnRequest(struct, version);
+ break;
+ case ADD_OFFSETS_TO_TXN:
+ request = new AddOffsetsToTxnRequest(struct, version);
+ break;
+ case END_TXN:
+ request = new EndTxnRequest(struct, version);
+ break;
+ case WRITE_TXN_MARKERS:
+ request = new WriteTxnMarkersRequest(struct, version);
+ break;
+ case TXN_OFFSET_COMMIT:
+ request = new TxnOffsetCommitRequest(struct, version);
+ break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 1ae30d1..433539c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -97,6 +97,16 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new InitPidResponse(struct);
case OFFSET_FOR_LEADER_EPOCH:
return new OffsetsForLeaderEpochResponse(struct);
+ case ADD_PARTITIONS_TO_TXN:
+ return new AddPartitionsToTxnResponse(struct);
+ case ADD_OFFSETS_TO_TXN:
+ return new AddOffsetsToTxnResponse(struct);
+ case END_TXN:
+ return new EndTxnResponse(struct);
+ case WRITE_TXN_MARKERS:
+ return new WriteTxnMarkersResponse(struct);
+ case TXN_OFFSET_COMMIT:
+ return new TxnOffsetCommitResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
new file mode 100644
index 0000000..4245e82
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AddOffsetsToTxnRequest extends AbstractRequest {
+ private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+ private static final String PID_KEY_NAME = "producer_id";
+ private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
+
+ public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
+ private final String transactionalId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final String consumerGroupId;
+
+ public Builder(String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+ super(ApiKeys.ADD_OFFSETS_TO_TXN);
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.consumerGroupId = consumerGroupId;
+ }
+
+ @Override
+ public AddOffsetsToTxnRequest build(short version) {
+ return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);
+ }
+ }
+
+ private final String transactionalId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final String consumerGroupId;
+
+ private AddOffsetsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+ super(version);
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.consumerGroupId = consumerGroupId;
+ }
+
+ public AddOffsetsToTxnRequest(Struct struct, short version) {
+ super(version);
+ this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+ this.producerId = struct.getLong(PID_KEY_NAME);
+ this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+ this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
+ }
+
+ public String transactionalId() {
+ return transactionalId;
+ }
+
+ public long producerId() {
+ return producerId;
+ }
+
+ public short producerEpoch() {
+ return producerEpoch;
+ }
+
+ public String consumerGroupId() {
+ return consumerGroupId;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
+ struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+ struct.set(PID_KEY_NAME, producerId);
+ struct.set(EPOCH_KEY_NAME, producerEpoch);
+ struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
+ return struct;
+ }
+
+ @Override
+ public AddOffsetsToTxnResponse getErrorResponse(Throwable e) {
+ return new AddOffsetsToTxnResponse(Errors.forException(e));
+ }
+
+ public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {
+ return new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN.parseRequest(version, buffer), version);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
new file mode 100644
index 0000000..6ac49fb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AddOffsetsToTxnResponse extends AbstractResponse {
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ // Possible error codes:
+ // NotCoordinator
+ // CoordinatorNotAvailable
+ // CoordinatorLoadInProgress
+ // InvalidPidMapping
+ // InvalidTxnState
+ // GroupAuthorizationFailed
+
+ private final Errors error;
+
+ public AddOffsetsToTxnResponse(Errors error) {
+ this.error = error;
+ }
+
+ public AddOffsetsToTxnResponse(Struct struct) {
+ this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
+ struct.set(ERROR_CODE_KEY_NAME, error.code());
+ return struct;
+ }
+
+ public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) {
+ return new AddOffsetsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version, buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
new file mode 100644
index 0000000..9a983d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AddPartitionsToTxnRequest extends AbstractRequest {
+ private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+ private static final String PID_KEY_NAME = "producer_id";
+ private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
+ private final String transactionalId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final List<TopicPartition> partitions;
+
+ public Builder(String transactionalId, long producerId, short producerEpoch, List<TopicPartition> partitions) {
+ super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.partitions = partitions;
+ }
+
+ @Override
+ public AddPartitionsToTxnRequest build(short version) {
+ return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
+ }
+ }
+
+ private final String transactionalId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final List<TopicPartition> partitions;
+
+ private AddPartitionsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch,
+ List<TopicPartition> partitions) {
+ super(version);
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.partitions = partitions;
+ }
+
+ public AddPartitionsToTxnRequest(Struct struct, short version) {
+ super(version);
+ this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+ this.producerId = struct.getLong(PID_KEY_NAME);
+ this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+
+ List<TopicPartition> partitions = new ArrayList<>();
+ Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+ for (Object topicPartitionObj : topicPartitionsArray) {
+ Struct topicPartitionStruct = (Struct) topicPartitionObj;
+ String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+ for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+ partitions.add(new TopicPartition(topic, (Integer) partitionObj));
+ }
+ }
+ this.partitions = partitions;
+ }
+
+ public String transactionalId() {
+ return transactionalId;
+ }
+
+ public long producerId() {
+ return producerId;
+ }
+
+ public short producerEpoch() {
+ return producerEpoch;
+ }
+
+ public List<TopicPartition> partitions() {
+ return partitions;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version()));
+ struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+ struct.set(PID_KEY_NAME, producerId);
+ struct.set(EPOCH_KEY_NAME, producerEpoch);
+
+ Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(partitions);
+ Object[] partitionsArray = new Object[mappedPartitions.size()];
+ int i = 0;
+ for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
+ Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
+ topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+ topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
+ partitionsArray[i++] = topicPartitionsStruct;
+ }
+
+ struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+ return struct;
+ }
+
+ @Override
+ public AddPartitionsToTxnResponse getErrorResponse(Throwable e) {
+ return new AddPartitionsToTxnResponse(Errors.forException(e));
+ }
+
+ public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) {
+ return new AddPartitionsToTxnRequest(ApiKeys.ADD_PARTITIONS_TO_TXN.parseRequest(version, buffer), version);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
new file mode 100644
index 0000000..3de6295
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AddPartitionsToTxnResponse extends AbstractResponse {
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ // Possible error codes:
+ // NotCoordinator
+ // CoordinatorNotAvailable
+ // CoordinatorLoadInProgress
+ // InvalidTxnState
+ // InvalidPidMapping
+ // TopicAuthorizationFailed
+
+ private final Errors error;
+
+ public AddPartitionsToTxnResponse(Errors error) {
+ this.error = error;
+ }
+
+ public AddPartitionsToTxnResponse(Struct struct) {
+ this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
+ struct.set(ERROR_CODE_KEY_NAME, error.code());
+ return struct;
+ }
+
+ public static AddPartitionsToTxnResponse parse(ByteBuffer buffer, short version) {
+ return new AddPartitionsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version, buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
new file mode 100644
index 0000000..e6eb54e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class EndTxnRequest extends AbstractRequest {
+ private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+ private static final String PID_KEY_NAME = "producer_id";
+ private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
+
+ public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
+ private final String transactionalId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final TransactionResult result;
+
+ public Builder(String transactionalId, long producerId, short producerEpoch, TransactionResult result) {
+ super(ApiKeys.END_TXN);
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.result = result;
+ }
+
+ @Override
+ public EndTxnRequest build(short version) {
+ return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result);
+ }
+ }
+
+ private final String transactionalId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final TransactionResult result;
+
+ private EndTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, TransactionResult result) {
+ super(version);
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.result = result;
+ }
+
+ public EndTxnRequest(Struct struct, short version) {
+ super(version);
+ this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+ this.producerId = struct.getLong(PID_KEY_NAME);
+ this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+ this.result = TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
+ }
+
+ public String transactionalId() {
+ return transactionalId;
+ }
+
+ public long producerId() {
+ return producerId;
+ }
+
+ public short producerEpoch() {
+ return producerEpoch;
+ }
+
+ public TransactionResult command() {
+ return result;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version()));
+ struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+ struct.set(PID_KEY_NAME, producerId);
+ struct.set(EPOCH_KEY_NAME, producerEpoch);
+ struct.set(TRANSACTION_RESULT_KEY_NAME, result.id);
+ return struct;
+ }
+
+ @Override
+ public EndTxnResponse getErrorResponse(Throwable e) {
+ return new EndTxnResponse(Errors.forException(e));
+ }
+
+ public static EndTxnRequest parse(ByteBuffer buffer, short version) {
+ return new EndTxnRequest(ApiKeys.END_TXN.parseRequest(version, buffer), version);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
new file mode 100644
index 0000000..627eb64
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class EndTxnResponse extends AbstractResponse {
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ // Possible error codes:
+ // NotCoordinator
+ // CoordinatorNotAvailable
+ // CoordinatorLoadInProgress
+ // InvalidTxnState
+ // InvalidPidMapping
+
+ private final Errors error;
+
+ public EndTxnResponse(Errors error) {
+ this.error = error;
+ }
+
+ public EndTxnResponse(Struct struct) {
+ this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
+ struct.set(ERROR_CODE_KEY_NAME, error.code());
+ return struct;
+ }
+
+ public static EndTxnResponse parse(ByteBuffer buffer, short version) {
+ return new EndTxnResponse(ApiKeys.END_TXN.parseResponse(version, buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 56eb838..db12d26 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -56,7 +56,7 @@ public class FetchResponse extends AbstractResponse {
private static final String RECORD_SET_KEY_NAME = "record_set";
// aborted transaction field names
- private static final String PID_KEY_NAME = "pid";
+ private static final String PID_KEY_NAME = "producer_id";
private static final String FIRST_OFFSET_KEY_NAME = "first_offset";
private static final int DEFAULT_THROTTLE_TIME = 0;
@@ -78,11 +78,11 @@ public class FetchResponse extends AbstractResponse {
private final int throttleTimeMs;
public static final class AbortedTransaction {
- public final long pid;
+ public final long producerId;
public final long firstOffset;
- public AbortedTransaction(long pid, long firstOffset) {
- this.pid = pid;
+ public AbortedTransaction(long producerId, long firstOffset) {
+ this.producerId = producerId;
this.firstOffset = firstOffset;
}
@@ -95,19 +95,19 @@ public class FetchResponse extends AbstractResponse {
AbortedTransaction that = (AbortedTransaction) o;
- return pid == that.pid && firstOffset == that.firstOffset;
+ return producerId == that.producerId && firstOffset == that.firstOffset;
}
@Override
public int hashCode() {
- int result = (int) (pid ^ (pid >>> 32));
+ int result = (int) (producerId ^ (producerId >>> 32));
result = 31 * result + (int) (firstOffset ^ (firstOffset >>> 32));
return result;
}
@Override
public String toString() {
- return "(pid=" + pid + ", firstOffset=" + firstOffset + ")";
+ return "(producerId=" + producerId + ", firstOffset=" + firstOffset + ")";
}
}
@@ -211,9 +211,9 @@ public class FetchResponse extends AbstractResponse {
abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
for (Object abortedTransactionObj : abortedTransactionsArray) {
Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long pid = abortedTransactionStruct.getLong(PID_KEY_NAME);
+ long producerId = abortedTransactionStruct.getLong(PID_KEY_NAME);
long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME);
- abortedTransactions.add(new AbortedTransaction(pid, firstOffset));
+ abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
}
}
}
@@ -339,7 +339,7 @@ public class FetchResponse extends AbstractResponse {
List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
- abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.pid);
+ abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.producerId);
abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset);
abortedTransactionStructs.add(abortedTransactionStruct);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
index ee92375..4b65aea 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -29,8 +29,8 @@ public class InitPidResponse extends AbstractResponse {
* OK
*
*/
- private static final String PRODUCER_ID_KEY_NAME = "pid";
- private static final String EPOCH_KEY_NAME = "epoch";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+ private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String ERROR_CODE_KEY_NAME = "error_code";
private final Errors error;
private final long producerId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java
new file mode 100644
index 0000000..d0448af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+public enum TransactionResult {
+ ABORT(false), COMMIT(true);
+
+ public final boolean id;
+
+ TransactionResult(boolean id) {
+ this.id = id;
+ }
+
+ public static TransactionResult forId(boolean id) {
+ if (id) {
+ return TransactionResult.COMMIT;
+ }
+ return TransactionResult.ABORT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
new file mode 100644
index 0000000..584f733
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TxnOffsetCommitRequest extends AbstractRequest {
+ private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
+ private static final String PID_KEY_NAME = "producer_id";
+ private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String RETENTION_TIME_KEY_NAME = "retention_time";
+ private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String OFFSET_KEY_NAME = "offset";
+ private static final String METADATA_KEY_NAME = "metadata";
+
+ public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
+ private final String consumerGroupId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final long retentionTimeMs;
+ private final Map<TopicPartition, CommittedOffset> offsets;
+
+ public Builder(String consumerGroupId, long producerId, short producerEpoch, long retentionTimeMs,
+ Map<TopicPartition, CommittedOffset> offsets) {
+ super(ApiKeys.TXN_OFFSET_COMMIT);
+ this.consumerGroupId = consumerGroupId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.retentionTimeMs = retentionTimeMs;
+ this.offsets = offsets;
+ }
+
+ @Override
+ public TxnOffsetCommitRequest build(short version) {
+ return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, retentionTimeMs, offsets);
+ }
+ }
+
+ private final String consumerGroupId;
+ private final long producerId;
+ private final short producerEpoch;
+ private final long retentionTimeMs;
+ private final Map<TopicPartition, CommittedOffset> offsets;
+
+ public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch,
+ long retentionTimeMs, Map<TopicPartition, CommittedOffset> offsets) {
+ super(version);
+ this.consumerGroupId = consumerGroupId;
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.retentionTimeMs = retentionTimeMs;
+ this.offsets = offsets;
+ }
+
+ public TxnOffsetCommitRequest(Struct struct, short version) {
+ super(version);
+ this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
+ this.producerId = struct.getLong(PID_KEY_NAME);
+ this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+ this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME);
+
+ Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
+ Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+ for (Object topicPartitionObj : topicPartitionsArray) {
+ Struct topicPartitionStruct = (Struct) topicPartitionObj;
+ String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+ for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionStruct = (Struct) partitionObj;
+ TopicPartition partition = new TopicPartition(topic, partitionStruct.getInt(PARTITION_KEY_NAME));
+ long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
+ String metadata = partitionStruct.getString(METADATA_KEY_NAME);
+ offsets.put(partition, new CommittedOffset(offset, metadata));
+ }
+ }
+ this.offsets = offsets;
+ }
+
+ public String consumerGroupId() {
+ return consumerGroupId;
+ }
+
+ public long producerId() {
+ return producerId;
+ }
+
+ public short producerEpoch() {
+ return producerEpoch;
+ }
+
+ public long retentionTimeMs() {
+ return retentionTimeMs;
+ }
+
+ public Map<TopicPartition, CommittedOffset> offsets() {
+ return offsets;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
+ struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
+ struct.set(PID_KEY_NAME, producerId);
+ struct.set(EPOCH_KEY_NAME, producerEpoch);
+ struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs);
+
+ Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets);
+ Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];
+ int i = 0;
+ for (Map.Entry<String, Map<Integer, CommittedOffset>> topicAndPartitions : mappedPartitionOffsets.entrySet()) {
+ Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
+ topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+
+ Map<Integer, CommittedOffset> partitionOffsets = topicAndPartitions.getValue();
+ Object[] partitionOffsetsArray = new Object[partitionOffsets.size()];
+ int j = 0;
+ for (Map.Entry<Integer, CommittedOffset> partitionOffset : partitionOffsets.entrySet()) {
+ Struct partitionOffsetStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
+ partitionOffsetStruct.set(PARTITION_KEY_NAME, partitionOffset.getKey());
+ CommittedOffset committedOffset = partitionOffset.getValue();
+ partitionOffsetStruct.set(OFFSET_KEY_NAME, committedOffset.offset);
+ partitionOffsetStruct.set(METADATA_KEY_NAME, committedOffset.metadata);
+ partitionOffsetsArray[j++] = partitionOffsetStruct;
+ }
+ topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionOffsetsArray);
+ partitionsArray[i++] = topicPartitionsStruct;
+ }
+
+ struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+ return struct;
+ }
+
+ @Override
+ public TxnOffsetCommitResponse getErrorResponse(Throwable e) {
+ Errors error = Errors.forException(e);
+ Map<TopicPartition, Errors> errors = new HashMap<>(offsets.size());
+ for (TopicPartition partition : offsets.keySet())
+ errors.put(partition, error);
+ return new TxnOffsetCommitResponse(errors);
+ }
+
+ public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {
+ return new TxnOffsetCommitRequest(ApiKeys.TXN_OFFSET_COMMIT.parseRequest(version, buffer), version);
+ }
+
+ public static class CommittedOffset {
+ private final long offset;
+ private final String metadata;
+
+ public CommittedOffset(long offset, String metadata) {
+ this.offset = offset;
+ this.metadata = metadata;
+ }
+
+ @Override
+ public String toString() {
+ return "CommittedOffset(" +
+ "offset=" + offset +
+ ", metadata='" + metadata + "')";
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public String metadata() {
+ return metadata;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
new file mode 100644
index 0000000..5574aea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TxnOffsetCommitResponse extends AbstractResponse {
+ private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+ // Possible error codes:
+ // InvalidProducerEpoch
+ // NotCoordinator
+ // CoordinatorNotAvailable
+ // CoordinatorLoadInProgress
+ // OffsetMetadataTooLarge
+ // GroupAuthorizationFailed
+ // InvalidCommitOffsetSize
+
+ private final Map<TopicPartition, Errors> errors;
+
+ public TxnOffsetCommitResponse(Map<TopicPartition, Errors> errors) {
+ this.errors = errors;
+ }
+
+ public TxnOffsetCommitResponse(Struct struct) {
+ Map<TopicPartition, Errors> errors = new HashMap<>();
+ Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+ for (Object topicPartitionObj : topicPartitionsArray) {
+ Struct topicPartitionStruct = (Struct) topicPartitionObj;
+ String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+ for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionStruct = (Struct) partitionObj;
+ Integer partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+ Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+ errors.put(new TopicPartition(topic, partition), error);
+ }
+ }
+ this.errors = errors;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
+ Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(errors);
+ Object[] partitionsArray = new Object[mappedPartitions.size()];
+ int i = 0;
+ for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
+ Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
+ topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+ Map<Integer, Errors> partitionAndErrors = topicAndPartitions.getValue();
+
+ Object[] partitionAndErrorsArray = new Object[partitionAndErrors.size()];
+ int j = 0;
+ for (Map.Entry<Integer, Errors> partitionAndError : partitionAndErrors.entrySet()) {
+ Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
+ partitionAndErrorStruct.set(PARTITION_KEY_NAME, partitionAndError.getKey());
+ partitionAndErrorStruct.set(ERROR_CODE_KEY_NAME, partitionAndError.getValue().code());
+ partitionAndErrorsArray[j++] = partitionAndErrorStruct;
+ }
+ topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
+ partitionsArray[i++] = topicPartitionsStruct;
+ }
+
+ struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+ return struct;
+ }
+
+ public Map<TopicPartition, Errors> errors() {
+ return errors;
+ }
+
+ public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
+ return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
new file mode 100644
index 0000000..fe64603
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WriteTxnMarkersRequest extends AbstractRequest {
+ private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
+ private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
+
+ private static final String PID_KEY_NAME = "producer_id";
+ private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
+ private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ public static class TxnMarkerEntry {
+ private final long producerId;
+ private final short producerEpoch;
+ private final TransactionResult result;
+ private final List<TopicPartition> partitions;
+
+ public TxnMarkerEntry(long producerId, short producerEpoch, TransactionResult result, List<TopicPartition> partitions) {
+ this.producerId = producerId;
+ this.producerEpoch = producerEpoch;
+ this.result = result;
+ this.partitions = partitions;
+ }
+
+ public long producerId() {
+ return producerId;
+ }
+
+ public short producerEpoch() {
+ return producerEpoch;
+ }
+
+ public TransactionResult transactionResult() {
+ return result;
+ }
+
+ public List<TopicPartition> partitions() {
+ return partitions;
+ }
+ }
+
+ public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
+ private final int coordinatorEpoch;
+ private final List<TxnMarkerEntry> markers;
+
+ public Builder(int coordinatorEpoch, List<TxnMarkerEntry> markers) {
+ super(ApiKeys.WRITE_TXN_MARKERS);
+
+ this.markers = markers;
+ this.coordinatorEpoch = coordinatorEpoch;
+ }
+
+ @Override
+ public WriteTxnMarkersRequest build(short version) {
+ return new WriteTxnMarkersRequest(version, coordinatorEpoch, markers);
+ }
+ }
+
+ private final int coordinatorEpoch;
+ private final List<TxnMarkerEntry> markers;
+
+ private WriteTxnMarkersRequest(short version, int coordinatorEpoch, List<TxnMarkerEntry> markers) {
+ super(version);
+
+ this.markers = markers;
+ this.coordinatorEpoch = coordinatorEpoch;
+ }
+
+ public WriteTxnMarkersRequest(Struct struct, short version) {
+ super(version);
+ this.coordinatorEpoch = struct.getInt(COORDINATOR_EPOCH_KEY_NAME);
+
+ List<TxnMarkerEntry> markers = new ArrayList<>();
+ Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
+ for (Object markerObj : markersArray) {
+ Struct markerStruct = (Struct) markerObj;
+
+ long producerId = markerStruct.getLong(PID_KEY_NAME);
+ short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME);
+ TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
+
+ List<TopicPartition> partitions = new ArrayList<>();
+ Object[] topicPartitionsArray = markerStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+ for (Object topicPartitionObj : topicPartitionsArray) {
+ Struct topicPartitionStruct = (Struct) topicPartitionObj;
+ String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+ for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+ partitions.add(new TopicPartition(topic, (Integer) partitionObj));
+ }
+ }
+
+ markers.add(new TxnMarkerEntry(producerId, producerEpoch, result, partitions));
+ }
+
+ this.markers = markers;
+ }
+
+ public int coordinatorEpoch() {
+ return coordinatorEpoch;
+ }
+
+ public List<TxnMarkerEntry> markers() {
+ return markers;
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.requestSchema(version()));
+ struct.set(COORDINATOR_EPOCH_KEY_NAME, coordinatorEpoch);
+
+ Object[] markersArray = new Object[markers.size()];
+ int i = 0;
+ for (TxnMarkerEntry entry : markers) {
+ Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
+ markerStruct.set(PID_KEY_NAME, entry.producerId);
+ markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch);
+ markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
+
+ Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(entry.partitions);
+ Object[] partitionsArray = new Object[mappedPartitions.size()];
+ int j = 0;
+ for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
+ Struct topicPartitionsStruct = markerStruct.instance(TOPIC_PARTITIONS_KEY_NAME);
+ topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+ topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
+ partitionsArray[j++] = topicPartitionsStruct;
+ }
+ markerStruct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+ markersArray[i++] = markerStruct;
+ }
+ struct.set(TXN_MARKER_ENTRY_KEY_NAME, markersArray);
+
+ return struct;
+ }
+
+ @Override
+ public WriteTxnMarkersResponse getErrorResponse(Throwable e) {
+ Errors error = Errors.forException(e);
+
+ Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());
+ for (TxnMarkerEntry entry : markers) {
+ Map<TopicPartition, Errors> errorsPerPartition = new HashMap<>(entry.partitions.size());
+ for (TopicPartition partition : entry.partitions)
+ errorsPerPartition.put(partition, error);
+
+ errors.put(entry.producerId, errorsPerPartition);
+ }
+
+ return new WriteTxnMarkersResponse(errors);
+ }
+
+ public static WriteTxnMarkersRequest parse(ByteBuffer buffer, short version) {
+ return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
+ }
+
+}