You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/07 11:08:46 UTC

[2/2] kafka git commit: KAFKA-4990; Request/response classes for transactions (KIP-98)

KAFKA-4990; Request/response classes for transactions (KIP-98)

Author: Matthias J. Sax <ma...@confluent.io>
Author: Guozhang Wang <wa...@gmail.com>
Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2799 from mjsax/kafka-4990-add-api-stub-config-parameters-request-types


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/865d82af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/865d82af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/865d82af

Branch: refs/heads/trunk
Commit: 865d82af2cc050d10544d70b95468da90c1d800b
Parents: 2f4f3b9
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 7 11:22:09 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Apr 7 12:07:25 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   4 +
 .../apache/kafka/clients/producer/Producer.java |  12 +-
 .../errors/InvalidPidMappingException.java      |  23 +++
 .../common/errors/InvalidTxnStateException.java |  23 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   7 +-
 .../apache/kafka/common/protocol/Errors.java    |  36 ++--
 .../apache/kafka/common/protocol/Protocol.java  | 185 +++++++++++++++++-
 .../kafka/common/record/ControlRecordType.java  |   8 +-
 .../kafka/common/requests/AbstractRequest.java  |  15 ++
 .../kafka/common/requests/AbstractResponse.java |  10 +
 .../common/requests/AddOffsetsToTxnRequest.java | 107 ++++++++++
 .../requests/AddOffsetsToTxnResponse.java       |  60 ++++++
 .../requests/AddPartitionsToTxnRequest.java     | 136 +++++++++++++
 .../requests/AddPartitionsToTxnResponse.java    |  60 ++++++
 .../kafka/common/requests/EndTxnRequest.java    | 107 ++++++++++
 .../kafka/common/requests/EndTxnResponse.java   |  59 ++++++
 .../kafka/common/requests/FetchResponse.java    |  20 +-
 .../kafka/common/requests/InitPidResponse.java  |   4 +-
 .../common/requests/TransactionResult.java      |  34 ++++
 .../common/requests/TxnOffsetCommitRequest.java | 195 +++++++++++++++++++
 .../requests/TxnOffsetCommitResponse.java       | 102 ++++++++++
 .../common/requests/WriteTxnMarkersRequest.java | 186 ++++++++++++++++++
 .../requests/WriteTxnMarkersResponse.java       | 130 +++++++++++++
 .../common/requests/RequestResponseTest.java    |  89 ++++++++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  32 ++-
 25 files changed, 1597 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 39d61e2..607ba69 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -13,6 +13,10 @@
               files=".*/protocol/Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/common/utils/Utils.java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files=".*/requests/AbstractRequest.java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files=".*/requests/AbstractResponse.java"/>
 
     <suppress checks="MethodLength"
               files="KerberosLogin.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index a77ecd0..4da8681 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -16,16 +16,16 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+
 import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.MetricName;
-
 
 /**
  * The interface for the {@link KafkaProducer}
@@ -36,7 +36,7 @@ public interface Producer<K, V> extends Closeable {
 
     /**
      * Send the given record asynchronously and return a future which will eventually contain the response information.
-     * 
+     *
      * @param record The record to send
      * @return A future which will eventually contain the response information
      */
@@ -46,7 +46,7 @@ public interface Producer<K, V> extends Closeable {
      * Send a record and invoke the given callback when the record has been acknowledged by the server
      */
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
-    
+
     /**
      * Flush any accumulated records from the producer. Blocks until all sends are complete.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java
new file mode 100644
index 0000000..69fb71e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidPidMappingException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidPidMappingException extends ApiException {
+    public InvalidPidMappingException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java
new file mode 100644
index 0000000..ff06904
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnStateException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InvalidTxnStateException extends ApiException {
+    public InvalidTxnStateException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b65defb..63bcfec 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -48,7 +48,12 @@ public enum ApiKeys {
     DELETE_TOPICS(20, "DeleteTopics"),
     DELETE_RECORDS(21, "DeleteRecords"),
     INIT_PRODUCER_ID(22, "InitProducerId"),
-    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch");
+    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch"),
+    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn"),
+    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn"),
+    END_TXN(26, "EndTxn"),
+    WRITE_TXN_MARKERS(27, "WriteTxnMarkers"),
+    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 519e52c..ccebd93 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.common.protocol;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -36,17 +33,16 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidFetchSizeException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.InvalidPidMappingException;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
-import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.ProducerFencedException;
-import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
@@ -55,23 +51,29 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * This class contains all the client-server errors--those errors that must be sent from the server to the client. These
  * are thus part of the protocol. The names can be changed but the error code cannot.
@@ -169,10 +171,18 @@ public enum Errors {
             " the message was sent to an incompatible broker. See the broker logs for more details.")),
     UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
         new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
-    POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
-    OUT_OF_ORDER_SEQUENCE_NUMBER(45, new OutOfOrderSequenceException("The broker received an out of order sequence number")),
-    DUPLICATE_SEQUENCE_NUMBER(46, new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
-    PRODUCER_FENCED(47, new ProducerFencedException("Producer attempted an operation with an old epoch"));
+    POLICY_VIOLATION(44,
+        new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
+    OUT_OF_ORDER_SEQUENCE_NUMBER(45,
+        new OutOfOrderSequenceException("The broker received an out of order sequence number")),
+    DUPLICATE_SEQUENCE_NUMBER(46,
+        new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
+    INVALID_PRODUCER_EPOCH(47,
+        new ProducerFencedException("Producer attempted an operation with an old epoch")),
+    INVALID_TXN_STATE(48,
+        new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
+    INVALID_PID_MAPPING(49,
+        new InvalidPidMappingException("The PID mapping is invalid"));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index cc228c5..4c58bb8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -647,7 +647,7 @@ public class Protocol {
     // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
     // last stable offset). It also exposes messages with magic v2 (along with older formats).
     private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
-            new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
+            new Field("producer_id", INT64, "The producer id associated with the aborted transactions"),
             new Field("first_offset", INT64, "The first offset in the aborted transaction"));
 
     public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
@@ -1180,19 +1180,19 @@ public class Protocol {
     public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
             new Field("transactional_id",
                     NULLABLE_STRING,
-                    "The transactional id whose pid we want to retrieve or generate.")
+                    "The transactional id whose producer id we want to retrieve or generate.")
     );
 
     public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
             new Field("error_code",
                     INT16,
                     "An integer error code."),
-            new Field("pid",
+            new Field("producer_id",
                     INT64,
-                    "The pid for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages"),
-            new Field("epoch",
+                    "The producer id for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages."),
+            new Field("producer_epoch",
                     INT16,
-                    "The epoch for the pid. Will always be 0 if no transactional id was specified in the request.")
+                    "The epoch for the producer id. Will always be 0 if no transactional id was specified in the request.")
     );
 
     public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0};
@@ -1249,6 +1249,169 @@ public class Protocol {
     public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
     public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
 
+    public static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
+            new Field("transactional_id",
+                    STRING,
+                    "The transactional id corresponding to the transaction."),
+            new Field("producer_id",
+                    INT64,
+                    "Current producer id in use by the transactional id."),
+            new Field("producer_epoch",
+                    INT16,
+                    "Current epoch associated with the producer id."),
+            new Field("topics",
+                    new ArrayOf(new Schema(
+                            new Field("topic", STRING),
+                            new Field("partitions", new ArrayOf(INT32)))),
+                    "The partitions to add to the transaction.")
+    );
+    public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
+            new Field("error_code",
+                    INT16,
+                    "An integer error code.")
+    );
+
+    public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0};
+    public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = new Schema[] {ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
+
+    public static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
+            new Field("transactional_id",
+                    STRING,
+                    "The transactional id corresponding to the transaction."),
+            new Field("producer_id",
+                    INT64,
+                    "Current producer id in use by the transactional id."),
+            new Field("producer_epoch",
+                    INT16,
+                    "Current epoch associated with the producer id."),
+            new Field("consumer_group_id",
+                    STRING,
+                    "Consumer group id whose offsets should be included in the transaction.")
+    );
+    public static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
+            new Field("error_code",
+                    INT16,
+                    "An integer error code.")
+    );
+
+    public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = new Schema[] {ADD_OFFSETS_TO_TXN_REQUEST_V0};
+    public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = new Schema[] {ADD_OFFSETS_TO_TXN_RESPONSE_V0};
+
+    public static final Schema END_TXN_REQUEST_V0 = new Schema(
+            new Field("transactional_id",
+                    STRING,
+                    "The transactional id corresponding to the transaction."),
+            new Field("producer_id",
+                    INT64,
+                    "Current producer id in use by the transactional id."),
+            new Field("producer_epoch",
+                    INT16,
+                    "Current epoch associated with the producer id."),
+            new Field("transaction_result",
+                    BOOLEAN,
+                    "The result of the transaction (0 = ABORT, 1 = COMMIT)")
+    );
+
+    public static final Schema END_TXN_RESPONSE_V0 = new Schema(
+            new Field("error_code",
+                    INT16,
+                    "An integer error code.")
+    );
+
+    public static final Schema[] END_TXN_REQUEST = new Schema[] {END_TXN_REQUEST_V0};
+    public static final Schema[] END_TXN_RESPONSE = new Schema[] {END_TXN_RESPONSE_V0};
+
+    public static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
+            new Field("producer_id",
+                    INT64,
+                    "Current producer id in use by the transactional id."),
+            new Field("producer_epoch",
+                    INT16,
+                    "Current epoch associated with the producer id."),
+            new Field("transaction_result",
+                    BOOLEAN,
+                    "The result of the transaction to write to the partitions (false = ABORT, true = COMMIT)."),
+            new Field("topics",
+                    new ArrayOf(new Schema(
+                            new Field("topic", STRING),
+                            new Field("partitions", new ArrayOf(INT32)))),
+                    "The partitions to write markers for.")
+    );
+
+    public static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
+            new Field("coordinator_epoch",
+                    INT32,
+                    "Epoch associated with the transaction state partition hosted by this transaction coordinator."),
+            new Field("transaction_markers",
+                    new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0),
+                    "The transaction markers to be written.")
+    );
+
+    public static final Schema WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0 = new Schema(
+            new Field("partition", INT32),
+            new Field("error_code", INT16)
+    );
+
+    public static final Schema WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0 = new Schema(
+            new Field("producer_id",
+                    INT64,
+                    "Current producer id in use by the transactional id."),
+            new Field("topics",
+                    new ArrayOf(new Schema(
+                            new Field("topic", STRING),
+                            new Field("partitions", new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))),
+                    "Errors per partition from writing markers.")
+    );
+
+    public static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
+            new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers.")
+    );
+
+    public static final Schema[] WRITE_TXN_REQUEST = new Schema[] {WRITE_TXN_MARKERS_REQUEST_V0};
+    public static final Schema[] WRITE_TXN_RESPONSE = new Schema[] {WRITE_TXN_MARKERS_RESPONSE_V0};
+
+    public static final Schema TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0 = new Schema(
+            new Field("partition", INT32),
+            new Field("offset", INT64),
+            new Field("metadata", NULLABLE_STRING)
+    );
+
+    public static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
+            new Field("consumer_group_id",
+                    STRING,
+                    "Id of the associated consumer group to commit offsets for."),
+            new Field("producer_id",
+                    INT64,
+                    "Current producer id in use by the transactional id."),
+            new Field("producer_epoch",
+                    INT16,
+                    "Current epoch associated with the producer id."),
+            new Field("retention_time",
+                    INT64,
+                    "The time in ms to retain the offset."),
+            new Field("topics",
+                    new ArrayOf(new Schema(
+                            new Field("topic", STRING),
+                            new Field("partitions", new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0)))),
+                    "The partitions to write markers for.")
+    );
+
+    public static final Schema TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0 = new Schema(
+            new Field("partition", INT32),
+            new Field("error_code", INT16)
+    );
+
+    public static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
+            new Field("topics",
+                    new ArrayOf(new Schema(
+                            new Field("topic", STRING),
+                            new Field("partitions", new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0)))),
+                    "Errors per partition from writing markers.")
+    );
+
+    public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] {TXN_OFFSET_COMMIT_REQUEST_V0};
+    public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] {TXN_OFFSET_COMMIT_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1283,6 +1446,11 @@ public class Protocol {
         REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
         REQUESTS[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_REQUEST;
+        REQUESTS[ApiKeys.ADD_PARTITIONS_TO_TXN.id] = ADD_PARTITIONS_TO_TXN_REQUEST;
+        REQUESTS[ApiKeys.ADD_OFFSETS_TO_TXN.id] = ADD_OFFSETS_TO_TXN_REQUEST;
+        REQUESTS[ApiKeys.END_TXN.id] = END_TXN_REQUEST;
+        REQUESTS[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_REQUEST;
+        REQUESTS[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1308,6 +1476,11 @@ public class Protocol {
         RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
         RESPONSES[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_RESPONSE;
+        RESPONSES[ApiKeys.ADD_PARTITIONS_TO_TXN.id] = ADD_PARTITIONS_TO_TXN_RESPONSE;
+        RESPONSES[ApiKeys.ADD_OFFSETS_TO_TXN.id] = ADD_OFFSETS_TO_TXN_RESPONSE;
+        RESPONSES[ApiKeys.END_TXN.id] = END_TXN_RESPONSE;
+        RESPONSES[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_RESPONSE;
+        RESPONSES[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
index 6bd614a..723af66 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -40,8 +40,8 @@ import java.nio.ByteBuffer;
  * The schema for the value field is left to the control record type to specify.
  */
 public enum ControlRecordType {
-    COMMIT((short) 0),
-    ABORT((short) 1),
+    ABORT((short) 0),
+    COMMIT((short) 1),
 
     // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
     UNKNOWN((short) -1);
@@ -77,9 +77,9 @@ public enum ControlRecordType {
         short type = key.getShort(2);
         switch (type) {
             case 0:
-                return COMMIT;
-            case 1:
                 return ABORT;
+            case 1:
+                return COMMIT;
             default:
                 return UNKNOWN;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 7ce3518..bd4bc49 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -177,6 +177,21 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case OFFSET_FOR_LEADER_EPOCH:
                 request = new OffsetsForLeaderEpochRequest(struct, version);
                 break;
+            case ADD_PARTITIONS_TO_TXN:
+                request = new AddPartitionsToTxnRequest(struct, version);
+                break;
+            case ADD_OFFSETS_TO_TXN:
+                request = new AddOffsetsToTxnRequest(struct, version);
+                break;
+            case END_TXN:
+                request = new EndTxnRequest(struct, version);
+                break;
+            case WRITE_TXN_MARKERS:
+                request = new WriteTxnMarkersRequest(struct, version);
+                break;
+            case TXN_OFFSET_COMMIT:
+                request = new TxnOffsetCommitRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 1ae30d1..433539c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -97,6 +97,16 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new InitPidResponse(struct);
             case OFFSET_FOR_LEADER_EPOCH:
                 return new OffsetsForLeaderEpochResponse(struct);
+            case ADD_PARTITIONS_TO_TXN:
+                return new AddPartitionsToTxnResponse(struct);
+            case ADD_OFFSETS_TO_TXN:
+                return new AddOffsetsToTxnResponse(struct);
+            case END_TXN:
+                return new EndTxnResponse(struct);
+            case WRITE_TXN_MARKERS:
+                return new WriteTxnMarkersResponse(struct);
+            case TXN_OFFSET_COMMIT:
+                return new TxnOffsetCommitResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
new file mode 100644
index 0000000..4245e82
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AddOffsetsToTxnRequest extends AbstractRequest {
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+    private static final String PID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
+
+    public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
+        private final String transactionalId;
+        private final long producerId;
+        private final short producerEpoch;
+        private final String consumerGroupId;
+
+        public Builder(String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+            super(ApiKeys.ADD_OFFSETS_TO_TXN);
+            this.transactionalId = transactionalId;
+            this.producerId = producerId;
+            this.producerEpoch = producerEpoch;
+            this.consumerGroupId = consumerGroupId;
+        }
+
+        @Override
+        public AddOffsetsToTxnRequest build(short version) {
+            return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);
+        }
+    }
+
+    private final String transactionalId;
+    private final long producerId;
+    private final short producerEpoch;
+    private final String consumerGroupId;
+
+    private AddOffsetsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
+        super(version);
+        this.transactionalId = transactionalId;
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.consumerGroupId = consumerGroupId;
+    }
+
+    public AddOffsetsToTxnRequest(Struct struct, short version) {
+        super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.producerId = struct.getLong(PID_KEY_NAME);
+        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+        this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
+    }
+
+    public String transactionalId() {
+        return transactionalId;
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public short producerEpoch() {
+        return producerEpoch;
+    }
+
+    public String consumerGroupId() {
+        return consumerGroupId;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(PID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, producerEpoch);
+        struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
+        return struct;
+    }
+
+    @Override
+    public AddOffsetsToTxnResponse getErrorResponse(Throwable e) {
+        return new AddOffsetsToTxnResponse(Errors.forException(e));
+    }
+
+    public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {
+        return new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN.parseRequest(version, buffer), version);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
new file mode 100644
index 0000000..6ac49fb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AddOffsetsToTxnResponse extends AbstractResponse {
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    // Possible error codes:
+    //   NotCoordinator
+    //   CoordinatorNotAvailable
+    //   CoordinatorLoadInProgress
+    //   InvalidPidMapping
+    //   InvalidTxnState
+    //   GroupAuthorizationFailed
+
+    private final Errors error;
+
+    public AddOffsetsToTxnResponse(Errors error) {
+        this.error = error;
+    }
+
+    public AddOffsetsToTxnResponse(Struct struct) {
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) {
+        return new AddOffsetsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version, buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
new file mode 100644
index 0000000..9a983d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AddPartitionsToTxnRequest extends AbstractRequest {
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+    private static final String PID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
+        private final String transactionalId;
+        private final long producerId;
+        private final short producerEpoch;
+        private final List<TopicPartition> partitions;
+
+        public Builder(String transactionalId, long producerId, short producerEpoch, List<TopicPartition> partitions) {
+            super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+            this.transactionalId = transactionalId;
+            this.producerId = producerId;
+            this.producerEpoch = producerEpoch;
+            this.partitions = partitions;
+        }
+
+        @Override
+        public AddPartitionsToTxnRequest build(short version) {
+            return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
+        }
+    }
+
+    private final String transactionalId;
+    private final long producerId;
+    private final short producerEpoch;
+    private final List<TopicPartition> partitions;
+
+    private AddPartitionsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch,
+                                      List<TopicPartition> partitions) {
+        super(version);
+        this.transactionalId = transactionalId;
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.partitions = partitions;
+    }
+
+    public AddPartitionsToTxnRequest(Struct struct, short version) {
+        super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.producerId = struct.getLong(PID_KEY_NAME);
+        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+
+        List<TopicPartition> partitions = new ArrayList<>();
+        Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+        for (Object topicPartitionObj : topicPartitionsArray) {
+            Struct topicPartitionStruct = (Struct) topicPartitionObj;
+            String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+                partitions.add(new TopicPartition(topic, (Integer) partitionObj));
+            }
+        }
+        this.partitions = partitions;
+    }
+
+    public String transactionalId() {
+        return transactionalId;
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public short producerEpoch() {
+        return producerEpoch;
+    }
+
+    public List<TopicPartition> partitions() {
+        return partitions;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(PID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, producerEpoch);
+
+        Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(partitions);
+        Object[] partitionsArray = new Object[mappedPartitions.size()];
+        int i = 0;
+        for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
+            Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
+            topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+            topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
+            partitionsArray[i++] = topicPartitionsStruct;
+        }
+
+        struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+        return struct;
+    }
+
+    @Override
+    public AddPartitionsToTxnResponse getErrorResponse(Throwable e) {
+        return new AddPartitionsToTxnResponse(Errors.forException(e));
+    }
+
+    public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) {
+        return new AddPartitionsToTxnRequest(ApiKeys.ADD_PARTITIONS_TO_TXN.parseRequest(version, buffer), version);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
new file mode 100644
index 0000000..3de6295
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class AddPartitionsToTxnResponse extends AbstractResponse {
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    // Possible error codes:
+    //   NotCoordinator
+    //   CoordinatorNotAvailable
+    //   CoordinatorLoadInProgress
+    //   InvalidTxnState
+    //   InvalidPidMapping
+    //   TopicAuthorizationFailed
+
+    private final Errors error;
+
+    public AddPartitionsToTxnResponse(Errors error) {
+        this.error = error;
+    }
+
+    public AddPartitionsToTxnResponse(Struct struct) {
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static AddPartitionsToTxnResponse parse(ByteBuffer buffer, short version) {
+        return new AddPartitionsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version, buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
new file mode 100644
index 0000000..e6eb54e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class EndTxnRequest extends AbstractRequest {
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+    private static final String PID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
+
+    public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
+        private final String transactionalId;
+        private final long producerId;
+        private final short producerEpoch;
+        private final TransactionResult result;
+
+        public Builder(String transactionalId, long producerId, short producerEpoch, TransactionResult result) {
+            super(ApiKeys.END_TXN);
+            this.transactionalId = transactionalId;
+            this.producerId = producerId;
+            this.producerEpoch = producerEpoch;
+            this.result = result;
+        }
+
+        @Override
+        public EndTxnRequest build(short version) {
+            return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result);
+        }
+    }
+
+    private final String transactionalId;
+    private final long producerId;
+    private final short producerEpoch;
+    private final TransactionResult result;
+
+    private EndTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, TransactionResult result) {
+        super(version);
+        this.transactionalId = transactionalId;
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.result = result;
+    }
+
+    public EndTxnRequest(Struct struct, short version) {
+        super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.producerId = struct.getLong(PID_KEY_NAME);
+        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+        this.result = TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
+    }
+
+    public String transactionalId() {
+        return transactionalId;
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public short producerEpoch() {
+        return producerEpoch;
+    }
+
+    public TransactionResult command() {
+        return result;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(PID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, producerEpoch);
+        struct.set(TRANSACTION_RESULT_KEY_NAME, result.id);
+        return struct;
+    }
+
+    @Override
+    public EndTxnResponse getErrorResponse(Throwable e) {
+        return new EndTxnResponse(Errors.forException(e));
+    }
+
+    public static EndTxnRequest parse(ByteBuffer buffer, short version) {
+        return new EndTxnRequest(ApiKeys.END_TXN.parseRequest(version, buffer), version);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
new file mode 100644
index 0000000..627eb64
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class EndTxnResponse extends AbstractResponse {
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    // Possible error codes:
+    //   NotCoordinator
+    //   CoordinatorNotAvailable
+    //   CoordinatorLoadInProgress
+    //   InvalidTxnState
+    //   InvalidPidMapping
+
+    private final Errors error;
+
+    public EndTxnResponse(Errors error) {
+        this.error = error;
+    }
+
+    public EndTxnResponse(Struct struct) {
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static EndTxnResponse parse(ByteBuffer buffer, short version) {
+        return new EndTxnResponse(ApiKeys.END_TXN.parseResponse(version, buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 56eb838..db12d26 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -56,7 +56,7 @@ public class FetchResponse extends AbstractResponse {
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
     // aborted transaction field names
-    private static final String PID_KEY_NAME = "pid";
+    private static final String PID_KEY_NAME = "producer_id";
     private static final String FIRST_OFFSET_KEY_NAME = "first_offset";
 
     private static final int DEFAULT_THROTTLE_TIME = 0;
@@ -78,11 +78,11 @@ public class FetchResponse extends AbstractResponse {
     private final int throttleTimeMs;
 
     public static final class AbortedTransaction {
-        public final long pid;
+        public final long producerId;
         public final long firstOffset;
 
-        public AbortedTransaction(long pid, long firstOffset) {
-            this.pid = pid;
+        public AbortedTransaction(long producerId, long firstOffset) {
+            this.producerId = producerId;
             this.firstOffset = firstOffset;
         }
 
@@ -95,19 +95,19 @@ public class FetchResponse extends AbstractResponse {
 
             AbortedTransaction that = (AbortedTransaction) o;
 
-            return pid == that.pid && firstOffset == that.firstOffset;
+            return producerId == that.producerId && firstOffset == that.firstOffset;
         }
 
         @Override
         public int hashCode() {
-            int result = (int) (pid ^ (pid >>> 32));
+            int result = (int) (producerId ^ (producerId >>> 32));
             result = 31 * result + (int) (firstOffset ^ (firstOffset >>> 32));
             return result;
         }
 
         @Override
         public String toString() {
-            return "(pid=" + pid + ", firstOffset=" + firstOffset + ")";
+            return "(producerId=" + producerId + ", firstOffset=" + firstOffset + ")";
         }
     }
 
@@ -211,9 +211,9 @@ public class FetchResponse extends AbstractResponse {
                         abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
                         for (Object abortedTransactionObj : abortedTransactionsArray) {
                             Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long pid = abortedTransactionStruct.getLong(PID_KEY_NAME);
+                            long producerId = abortedTransactionStruct.getLong(PID_KEY_NAME);
                             long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME);
-                            abortedTransactions.add(new AbortedTransaction(pid, firstOffset));
+                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
                         }
                     }
                 }
@@ -339,7 +339,7 @@ public class FetchResponse extends AbstractResponse {
                         List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
                         for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
                             Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
-                            abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.pid);
+                            abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.producerId);
                             abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset);
                             abortedTransactionStructs.add(abortedTransactionStruct);
                         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
index ee92375..4b65aea 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -29,8 +29,8 @@ public class InitPidResponse extends AbstractResponse {
      * OK
      *
      */
-    private static final String PRODUCER_ID_KEY_NAME = "pid";
-    private static final String EPOCH_KEY_NAME = "epoch";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private final Errors error;
     private final long producerId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java
new file mode 100644
index 0000000..d0448af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+public enum TransactionResult {
+    ABORT(false), COMMIT(true);
+
+    public final boolean id;
+
+    TransactionResult(boolean id) {
+        this.id = id;
+    }
+
+    public static TransactionResult forId(boolean id) {
+        if (id) {
+            return TransactionResult.COMMIT;
+        }
+        return TransactionResult.ABORT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
new file mode 100644
index 0000000..584f733
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TxnOffsetCommitRequest extends AbstractRequest {
+    private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
+    private static final String PID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String RETENTION_TIME_KEY_NAME = "retention_time";
+    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String OFFSET_KEY_NAME = "offset";
+    private static final String METADATA_KEY_NAME = "metadata";
+
+    public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
+        private final String consumerGroupId;
+        private final long producerId;
+        private final short producerEpoch;
+        private final long retentionTimeMs;
+        private final Map<TopicPartition, CommittedOffset> offsets;
+
+        public Builder(String consumerGroupId, long producerId, short producerEpoch, long retentionTimeMs,
+                       Map<TopicPartition, CommittedOffset> offsets) {
+            super(ApiKeys.TXN_OFFSET_COMMIT);
+            this.consumerGroupId = consumerGroupId;
+            this.producerId = producerId;
+            this.producerEpoch = producerEpoch;
+            this.retentionTimeMs = retentionTimeMs;
+            this.offsets = offsets;
+        }
+
+        @Override
+        public TxnOffsetCommitRequest build(short version) {
+            return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, retentionTimeMs, offsets);
+        }
+    }
+
+    private final String consumerGroupId;
+    private final long producerId;
+    private final short producerEpoch;
+    private final long retentionTimeMs;
+    private final Map<TopicPartition, CommittedOffset> offsets;
+
+    public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch,
+                                  long retentionTimeMs, Map<TopicPartition, CommittedOffset> offsets) {
+        super(version);
+        this.consumerGroupId = consumerGroupId;
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.retentionTimeMs = retentionTimeMs;
+        this.offsets = offsets;
+    }
+
+    public TxnOffsetCommitRequest(Struct struct, short version) {
+        super(version);
+        this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
+        this.producerId = struct.getLong(PID_KEY_NAME);
+        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+        this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME);
+
+        Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
+        Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+        for (Object topicPartitionObj : topicPartitionsArray) {
+            Struct topicPartitionStruct = (Struct) topicPartitionObj;
+            String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionObj;
+                TopicPartition partition = new TopicPartition(topic, partitionStruct.getInt(PARTITION_KEY_NAME));
+                long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
+                String metadata = partitionStruct.getString(METADATA_KEY_NAME);
+                offsets.put(partition, new CommittedOffset(offset, metadata));
+            }
+        }
+        this.offsets = offsets;
+    }
+
+    public String consumerGroupId() {
+        return consumerGroupId;
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public short producerEpoch() {
+        return producerEpoch;
+    }
+
+    public long retentionTimeMs() {
+        return retentionTimeMs;
+    }
+
+    public Map<TopicPartition, CommittedOffset> offsets() {
+        return offsets;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
+        struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
+        struct.set(PID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, producerEpoch);
+        struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs);
+
+        Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets);
+        Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];
+        int i = 0;
+        for (Map.Entry<String, Map<Integer, CommittedOffset>> topicAndPartitions : mappedPartitionOffsets.entrySet()) {
+            Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
+            topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+
+            Map<Integer, CommittedOffset> partitionOffsets = topicAndPartitions.getValue();
+            Object[] partitionOffsetsArray = new Object[partitionOffsets.size()];
+            int j = 0;
+            for (Map.Entry<Integer, CommittedOffset> partitionOffset : partitionOffsets.entrySet()) {
+                Struct partitionOffsetStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
+                partitionOffsetStruct.set(PARTITION_KEY_NAME, partitionOffset.getKey());
+                CommittedOffset committedOffset = partitionOffset.getValue();
+                partitionOffsetStruct.set(OFFSET_KEY_NAME, committedOffset.offset);
+                partitionOffsetStruct.set(METADATA_KEY_NAME, committedOffset.metadata);
+                partitionOffsetsArray[j++] = partitionOffsetStruct;
+            }
+            topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionOffsetsArray);
+            partitionsArray[i++] = topicPartitionsStruct;
+        }
+
+        struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+        return struct;
+    }
+
+    @Override
+    public TxnOffsetCommitResponse getErrorResponse(Throwable e) {
+        Errors error = Errors.forException(e);
+        Map<TopicPartition, Errors> errors = new HashMap<>(offsets.size());
+        for (TopicPartition partition : offsets.keySet())
+            errors.put(partition, error);
+        return new TxnOffsetCommitResponse(errors);
+    }
+
+    public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {
+        return new TxnOffsetCommitRequest(ApiKeys.TXN_OFFSET_COMMIT.parseRequest(version, buffer), version);
+    }
+
+    public static class CommittedOffset {
+        private final long offset;
+        private final String metadata;
+
+        public CommittedOffset(long offset, String metadata) {
+            this.offset = offset;
+            this.metadata = metadata;
+        }
+
+        @Override
+        public String toString() {
+            return "CommittedOffset(" +
+                    "offset=" + offset +
+                    ", metadata='" + metadata + "')";
+        }
+
+        public long offset() {
+            return offset;
+        }
+
+        public String metadata() {
+            return metadata;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
new file mode 100644
index 0000000..5574aea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TxnOffsetCommitResponse extends AbstractResponse {
+    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    // Possible error codes:
+    //   InvalidProducerEpoch
+    //   NotCoordinator
+    //   CoordinatorNotAvailable
+    //   CoordinatorLoadInProgress
+    //   OffsetMetadataTooLarge
+    //   GroupAuthorizationFailed
+    //   InvalidCommitOffsetSize
+
+    private final Map<TopicPartition, Errors> errors;
+
+    public TxnOffsetCommitResponse(Map<TopicPartition, Errors> errors) {
+        this.errors = errors;
+    }
+
+    public TxnOffsetCommitResponse(Struct struct) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+        for (Object topicPartitionObj : topicPartitionsArray) {
+            Struct topicPartitionStruct = (Struct) topicPartitionObj;
+            String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionObj;
+                Integer partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                errors.put(new TopicPartition(topic, partition), error);
+            }
+        }
+        this.errors = errors;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
+        Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(errors);
+        Object[] partitionsArray = new Object[mappedPartitions.size()];
+        int i = 0;
+        for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
+            Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
+            topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+            Map<Integer, Errors> partitionAndErrors = topicAndPartitions.getValue();
+
+            Object[] partitionAndErrorsArray = new Object[partitionAndErrors.size()];
+            int j = 0;
+            for (Map.Entry<Integer, Errors> partitionAndError : partitionAndErrors.entrySet()) {
+                Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
+                partitionAndErrorStruct.set(PARTITION_KEY_NAME, partitionAndError.getKey());
+                partitionAndErrorStruct.set(ERROR_CODE_KEY_NAME, partitionAndError.getValue().code());
+                partitionAndErrorsArray[j++] = partitionAndErrorStruct;
+            }
+            topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
+            partitionsArray[i++] = topicPartitionsStruct;
+        }
+
+        struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+        return struct;
+    }
+
+    public Map<TopicPartition, Errors> errors() {
+        return errors;
+    }
+
+    public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
+        return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
new file mode 100644
index 0000000..fe64603
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WriteTxnMarkersRequest extends AbstractRequest {
+    private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
+    private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
+
+    private static final String PID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
+    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    public static class TxnMarkerEntry {
+        private final long producerId;
+        private final short producerEpoch;
+        private final TransactionResult result;
+        private final List<TopicPartition> partitions;
+
+        public TxnMarkerEntry(long producerId, short producerEpoch, TransactionResult result, List<TopicPartition> partitions) {
+            this.producerId = producerId;
+            this.producerEpoch = producerEpoch;
+            this.result = result;
+            this.partitions = partitions;
+        }
+
+        public long producerId() {
+            return producerId;
+        }
+
+        public short producerEpoch() {
+            return producerEpoch;
+        }
+
+        public TransactionResult transactionResult() {
+            return result;
+        }
+
+        public List<TopicPartition> partitions() {
+            return partitions;
+        }
+    }
+
+    public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
+        private final int coordinatorEpoch;
+        private final List<TxnMarkerEntry> markers;
+
+        public Builder(int coordinatorEpoch, List<TxnMarkerEntry> markers) {
+            super(ApiKeys.WRITE_TXN_MARKERS);
+
+            this.markers = markers;
+            this.coordinatorEpoch = coordinatorEpoch;
+        }
+
+        @Override
+        public WriteTxnMarkersRequest build(short version) {
+            return new WriteTxnMarkersRequest(version, coordinatorEpoch, markers);
+        }
+    }
+
+    private final int coordinatorEpoch;
+    private final List<TxnMarkerEntry> markers;
+
+    private WriteTxnMarkersRequest(short version, int coordinatorEpoch, List<TxnMarkerEntry> markers) {
+        super(version);
+
+        this.markers = markers;
+        this.coordinatorEpoch = coordinatorEpoch;
+    }
+
+    public WriteTxnMarkersRequest(Struct struct, short version) {
+        super(version);
+        this.coordinatorEpoch = struct.getInt(COORDINATOR_EPOCH_KEY_NAME);
+
+        List<TxnMarkerEntry> markers = new ArrayList<>();
+        Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
+        for (Object markerObj : markersArray) {
+            Struct markerStruct = (Struct) markerObj;
+
+            long producerId = markerStruct.getLong(PID_KEY_NAME);
+            short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME);
+            TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
+
+            List<TopicPartition> partitions = new ArrayList<>();
+            Object[] topicPartitionsArray = markerStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+            for (Object topicPartitionObj : topicPartitionsArray) {
+                Struct topicPartitionStruct = (Struct) topicPartitionObj;
+                String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+                for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
+                    partitions.add(new TopicPartition(topic, (Integer) partitionObj));
+                }
+            }
+
+            markers.add(new TxnMarkerEntry(producerId, producerEpoch, result, partitions));
+        }
+
+        this.markers = markers;
+    }
+
+    public int coordinatorEpoch() {
+        return coordinatorEpoch;
+    }
+
+    public List<TxnMarkerEntry> markers() {
+        return markers;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.requestSchema(version()));
+        struct.set(COORDINATOR_EPOCH_KEY_NAME, coordinatorEpoch);
+
+        Object[] markersArray = new Object[markers.size()];
+        int i = 0;
+        for (TxnMarkerEntry entry : markers) {
+            Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
+            markerStruct.set(PID_KEY_NAME, entry.producerId);
+            markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch);
+            markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
+
+            Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(entry.partitions);
+            Object[] partitionsArray = new Object[mappedPartitions.size()];
+            int j = 0;
+            for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
+                Struct topicPartitionsStruct = markerStruct.instance(TOPIC_PARTITIONS_KEY_NAME);
+                topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+                topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
+                partitionsArray[j++] = topicPartitionsStruct;
+            }
+            markerStruct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+            markersArray[i++] = markerStruct;
+        }
+        struct.set(TXN_MARKER_ENTRY_KEY_NAME, markersArray);
+
+        return struct;
+    }
+
+    @Override
+    public WriteTxnMarkersResponse getErrorResponse(Throwable e) {
+        Errors error = Errors.forException(e);
+
+        Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());
+        for (TxnMarkerEntry entry : markers) {
+            Map<TopicPartition, Errors> errorsPerPartition = new HashMap<>(entry.partitions.size());
+            for (TopicPartition partition : entry.partitions)
+                errorsPerPartition.put(partition, error);
+
+            errors.put(entry.producerId, errorsPerPartition);
+        }
+
+        return new WriteTxnMarkersResponse(errors);
+    }
+
+    public static WriteTxnMarkersRequest parse(ByteBuffer buffer, short version) {
+        return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
+    }
+
+}