You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/06 18:51:15 UTC

[6/6] kafka git commit: KAFKA-5121; Implement transaction index for KIP-98

KAFKA-5121; Implement transaction index for KIP-98

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2910 from hachikuji/eos-txn-index


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e71dce89
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e71dce89
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e71dce89

Branch: refs/heads/trunk
Commit: e71dce89c0da50f3eccc47d0fc050c92d5a99b88
Parents: 29994dd
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat May 6 11:49:35 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sat May 6 11:49:35 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  79 +--
 .../TransactionCoordinatorFencedException.java  |  30 +
 .../apache/kafka/common/protocol/Errors.java    |  17 +-
 .../apache/kafka/common/protocol/Protocol.java  |  17 +-
 .../record/AbstractLegacyRecordBatch.java       |   2 +-
 .../kafka/common/record/AbstractRecords.java    |   9 +-
 .../kafka/common/record/ControlRecordType.java  |  24 +-
 .../kafka/common/record/DefaultRecord.java      |  50 +-
 .../kafka/common/record/DefaultRecordBatch.java |  22 +-
 .../common/record/EndTransactionMarker.java     | 124 ++++
 .../kafka/common/record/FileLogInputStream.java |   6 +
 .../apache/kafka/common/record/FileRecords.java |  13 +-
 .../kafka/common/record/MemoryRecords.java      | 109 +++-
 .../common/record/MemoryRecordsBuilder.java     |  66 +-
 .../org/apache/kafka/common/record/Record.java  |   9 -
 .../apache/kafka/common/record/RecordBatch.java |   8 +
 .../common/requests/ListOffsetRequest.java      |  34 +-
 .../org/apache/kafka/common/utils/Utils.java    |   5 +
 .../clients/consumer/internals/FetcherTest.java |  97 +--
 .../common/record/DefaultRecordBatchTest.java   |  31 +-
 .../kafka/common/record/DefaultRecordTest.java  |  45 +-
 .../common/record/EndTransactionMarkerTest.java |  70 +++
 .../kafka/common/record/FileRecordsTest.java    |  11 +-
 .../common/record/MemoryRecordsBuilderTest.java |  93 ++-
 .../kafka/common/record/MemoryRecordsTest.java  |  80 ++-
 .../common/requests/RequestResponseTest.java    |  22 +-
 .../main/scala/kafka/cluster/Partition.scala    |   4 +-
 core/src/main/scala/kafka/cluster/Replica.scala |  23 +-
 .../group/GroupMetadataManager.scala            |  27 +-
 .../transaction/TransactionStateManager.scala   |  19 +-
 .../main/scala/kafka/log/AbstractIndex.scala    |  23 +-
 core/src/main/scala/kafka/log/Log.scala         | 611 ++++++++++++-------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  34 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   6 +-
 core/src/main/scala/kafka/log/LogSegment.scala  | 132 ++--
 .../src/main/scala/kafka/log/LogValidator.scala |  83 +--
 core/src/main/scala/kafka/log/OffsetIndex.scala |  20 +-
 .../scala/kafka/log/ProducerIdMapping.scala     | 384 ------------
 .../scala/kafka/log/ProducerStateManager.scala  | 590 ++++++++++++++++++
 core/src/main/scala/kafka/log/TimeIndex.scala   |   7 +-
 .../main/scala/kafka/log/TransactionIndex.scala | 243 ++++++++
 .../main/scala/kafka/server/DelayedFetch.scala  |  14 +-
 .../main/scala/kafka/server/FetchDataInfo.scala |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  67 +-
 .../scala/kafka/server/LogOffsetMetadata.scala  |   8 +-
 .../scala/kafka/server/ReplicaManager.scala     |  31 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |  29 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   2 +-
 .../group/GroupCoordinatorResponseTest.scala    |   6 +-
 .../group/GroupMetadataManagerTest.scala        |  22 +-
 .../TransactionStateManagerTest.scala           |   7 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +-
 .../log/LogCleanerLagIntegrationTest.scala      |   3 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |   2 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   5 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 104 +++-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 555 +++++++++++++++--
 .../scala/unit/kafka/log/LogValidatorTest.scala | 209 +++++--
 .../scala/unit/kafka/log/OffsetIndexTest.scala  |  24 +-
 .../unit/kafka/log/ProducerIdMappingTest.scala  | 291 ---------
 .../kafka/log/ProducerStateManagerTest.scala    | 562 +++++++++++++++++
 .../unit/kafka/log/TransactionIndexTest.scala   | 173 ++++++
 .../scala/unit/kafka/server/LogOffsetTest.scala |  12 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala |   7 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 151 ++++-
 .../unit/kafka/server/RequestQuotaTest.scala    |   7 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   2 +-
 69 files changed, 4099 insertions(+), 1488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0c5c385..dc6c338 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -669,7 +668,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node,
                                                                                  final Map<TopicPartition, Long> timestampsToSearch,
                                                                                  boolean requireTimestamp) {
-        ListOffsetRequest.Builder builder = ListOffsetRequest.Builder.forConsumer(requireTimestamp)
+        ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
+                .forConsumer(requireTimestamp, isolationLevel)
                 .setTargetTimes(timestampsToSearch);
 
         log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
@@ -1003,12 +1003,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         return null;
                     }
                     currentBatch = batches.next();
-
                     maybeEnsureValid(currentBatch);
 
-                    if (isolationLevel == IsolationLevel.READ_COMMITTED && isBatchAborted(currentBatch)) {
-                        nextFetchOffset = currentBatch.lastOffset() + 1;
-                        continue;
+                    if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
+                        long producerId = currentBatch.producerId();
+                        if (containsAbortMarker(currentBatch)) {
+                            abortedProducerIds.remove(producerId);
+                        } else if (isBatchAborted(currentBatch)) {
+                            log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}",
+                                    producerId, currentBatch.baseOffset(), partition);
+                            nextFetchOffset = currentBatch.lastOffset() + 1;
+                            continue;
+                        }
                     }
 
                     records = currentBatch.streamingIterator();
@@ -1022,7 +1028,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                     nextFetchOffset = record.offset() + 1;
 
                     // control records are not returned to the user
-                    if (!record.isControlRecord())
+                    if (!currentBatch.isControlBatch())
                          return record;
                 }
             }
@@ -1046,7 +1052,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         }
 
         private boolean isBatchAborted(RecordBatch batch) {
-           /* When in READ_COMMITTED mode, we need to do the following for each incoming entry:
+            /* When in READ_COMMITTED mode, we need to do the following for each incoming entry:
             *   0. Check whether the pid is in the 'abortedProducerIds' set && the entry does not include an abort marker.
             *      If so, skip the entry.
             *   1. If the pid is in aborted pids and the entry contains an abort marker, remove the pid from
@@ -1056,47 +1062,48 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             *      this means that the entry has been aborted. Add the pid to the aborted pids set, and remove
             *      the entry from the abort index.
             */
-            FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek();
-            if (abortedProducerIds.contains(batch.producerId())
-                    || (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset())) {
-                if (abortedProducerIds.contains(batch.producerId()) && containsAbortMarker(batch)) {
-                    abortedProducerIds.remove(batch.producerId());
-                } else if (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset()) {
-                    abortedProducerIds.add(batch.producerId());
+            long producerId = batch.producerId();
+            if (abortedProducerIds.contains(producerId)) {
+                return true;
+            } else if (abortedTransactions != null && !abortedTransactions.isEmpty()) {
+                FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek();
+                if (nextAbortedTransaction.producerId == producerId && nextAbortedTransaction.firstOffset <= batch.baseOffset()) {
+                    abortedProducerIds.add(producerId);
                     abortedTransactions.poll();
+                    return true;
                 }
-                log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}", batch.producerId(), batch.baseOffset(), partition);
-                return true;
             }
             return false;
         }
 
         private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) {
-            PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = null;
-            if (partition.abortedTransactions != null && !partition.abortedTransactions.isEmpty()) {
-                abortedTransactions = new PriorityQueue<>(
-                        partition.abortedTransactions.size(),
-                        new Comparator<FetchResponse.AbortedTransaction>() {
-                            @Override
-                            public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
-                                return Long.compare(o1.firstOffset, o2.firstOffset);
-                            }
+            if (partition.abortedTransactions == null || partition.abortedTransactions.isEmpty())
+                return null;
+
+            PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
+                    partition.abortedTransactions.size(),
+                    new Comparator<FetchResponse.AbortedTransaction>() {
+                        @Override
+                        public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
+                            return Long.compare(o1.firstOffset, o2.firstOffset);
                         }
-                );
-                abortedTransactions.addAll(partition.abortedTransactions);
-            } else {
-                abortedTransactions = new PriorityQueue<>();
-            }
+                    }
+            );
+            abortedTransactions.addAll(partition.abortedTransactions);
             return abortedTransactions;
         }
 
         private boolean containsAbortMarker(RecordBatch batch) {
+            if (!batch.isControlBatch())
+                return false;
+
             Iterator<Record> batchIterator = batch.iterator();
-            Record firstRecord = batchIterator.hasNext() ? batchIterator.next() : null;
-            boolean containsAbortMarker = firstRecord != null && firstRecord.isControlRecord() && ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
-            if (containsAbortMarker && batchIterator.hasNext())
-                throw new CorruptRecordException("A record batch containing a control message contained more than one record. partition: " + partition + ", offset: " + batch.baseOffset());
-            return containsAbortMarker;
+            if (!batchIterator.hasNext())
+                throw new InvalidRecordException("Invalid batch for partition " + partition + " at offset " +
+                        batch.baseOffset() + " with control sequence set, but no records");
+
+            Record firstRecord = batchIterator.next();
+            return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java
new file mode 100644
index 0000000..583ce04
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class TransactionCoordinatorFencedException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TransactionCoordinatorFencedException(String message) {
+        super(message);
+    }
+
+    public TransactionCoordinatorFencedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 65bec4a..960fdda 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -21,11 +21,11 @@ import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ConcurrentTransactionsException;
 import org.apache.kafka.common.errors.ControllerMovedException;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
@@ -39,12 +39,11 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
-import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
-import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
@@ -54,6 +53,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -64,6 +64,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -460,6 +461,14 @@ public enum Errors {
             public ApiException build(String message) {
                 return new ConcurrentTransactionsException(message);
             }
+        }),
+    TRANSACTION_COORDINATOR_FENCED(52, "Indicates that the transaction coordinator sending a WriteTxnMarker " +
+            "is no longer the current coordinator for a given producer",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new TransactionCoordinatorFencedException(message);
+            }
         });
              
     private interface ApiExceptionBuilder {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 16ec9ea..fb3c8c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -467,8 +467,21 @@ public class Protocol {
                                                                              new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
                                                                              "Topics to list offsets."));
 
-    /* v2 request is the same as v1. Throttle time has been added to response */
-    public static final Schema LIST_OFFSET_REQUEST_V2 = LIST_OFFSET_REQUEST_V1;
+    public static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
+            new Field("replica_id",
+                    INT32,
+                    "Broker id of the follower. For normal consumers, use -1."),
+            new Field("isolation_level",
+                    INT8,
+                    "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+                            "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+                            "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+                            "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+                            "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+                            "consumers to discard ABORTED transactional records"),
+            new Field("topics",
+                    new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1),
+                    "Topics to list offsets."));;
 
     public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                         INT32,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 85fcb2a..7be4bdd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -206,7 +206,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     @Override
-    public boolean isControlRecord() {
+    public boolean isControlBatch() {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 87df7e4..cfda8a4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -56,6 +56,9 @@ public abstract class AbstractRecords implements Records {
         int totalSizeEstimate = 0;
 
         for (RecordBatch batch : batches) {
+            if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
+                continue;
+
             if (batch.magic() <= toMagic) {
                 totalSizeEstimate += batch.sizeInBytes();
                 recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
@@ -94,12 +97,8 @@ public abstract class AbstractRecords implements Records {
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
                 timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
-        for (Record record : recordBatchAndRecords.records) {
-            // control messages are only supported in v2 and above, so skip when down-converting
-            if (magic < RecordBatch.MAGIC_VALUE_V2 && record.isControlRecord())
-                continue;
+        for (Record record : recordBatchAndRecords.records)
             builder.append(record);
-        }
 
         builder.close();
         return builder.buffer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
index 790b2ee..d5ead14 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -49,6 +49,7 @@ public enum ControlRecordType {
     private static final Logger log = LoggerFactory.getLogger(ControlRecordType.class);
 
     static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
+    static final int CURRENT_CONTROL_RECORD_KEY_SIZE = 4;
     private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new Schema(
             new Field("version", Type.INT16),
             new Field("type", Type.INT16));
@@ -69,13 +70,24 @@ public enum ControlRecordType {
         return struct;
     }
 
-    public static ControlRecordType parse(ByteBuffer key) {
+    public static short parseTypeId(ByteBuffer key) {
+        if (key.remaining() < CURRENT_CONTROL_RECORD_KEY_SIZE)
+            throw new InvalidRecordException("Invalid value size found for end control record key. Must have " +
+                    "at least " + CURRENT_CONTROL_RECORD_KEY_SIZE + " bytes, but found only " + key.remaining());
+
         short version = key.getShort(0);
+        if (version < 0)
+            throw new InvalidRecordException("Invalid version found for control record: " + version +
+                    ". May indicate data corruption");
+
         if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
-            log.debug("Received unknown control record key version {}. Parsing as version {}", version, 
+            log.debug("Received unknown control record key version {}. Parsing as version {}", version,
                     CURRENT_CONTROL_RECORD_KEY_VERSION);
-        short type = key.getShort(2);
-        switch (type) {
+        return key.getShort(2);
+    }
+
+    public static ControlRecordType fromTypeId(short typeId) {
+        switch (typeId) {
             case 0:
                 return ABORT;
             case 1:
@@ -84,4 +96,8 @@ public enum ControlRecordType {
                 return UNKNOWN;
         }
     }
+
+    public static ControlRecordType parse(ByteBuffer key) {
+        return fromTypeId(parseTypeId(key));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index e0794d8..669c75d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -55,11 +55,9 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
  *
  * The current record attributes are depicted below:
  *
- *  -----------------------------------
- *  | Unused (1-7) | Control Flag (0) |
- *  -----------------------------------
- *
- * The control flag is used to implement control records (see {@link ControlRecordType}).
+ *  ----------------
+ *  | Unused (0-7) |
+ *  ----------------
  *
  * The offset and timestamp deltas compute the difference relative to the base offset and
  * base timestamp of the log entry that this record is contained in.
@@ -69,7 +67,6 @@ public class DefaultRecord implements Record {
     // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
     public static final int MAX_RECORD_OVERHEAD = 21;
 
-    private static final int CONTROL_FLAG_MASK = 0x01;
     private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);
 
     private final int sizeInBytes;
@@ -180,7 +177,6 @@ public class DefaultRecord implements Record {
      * Write the record to `out` and return its crc.
      */
     public static long writeTo(DataOutputStream out,
-                               boolean isControlRecord,
                                int offsetDelta,
                                long timestampDelta,
                                ByteBuffer key,
@@ -189,7 +185,7 @@ public class DefaultRecord implements Record {
         int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
         ByteUtils.writeVarint(sizeInBytes, out);
 
-        byte attributes = computeAttributes(isControlRecord);
+        byte attributes = 0; // there are no used record attributes at the moment
         out.write(attributes);
 
         ByteUtils.writeVarlong(timestampDelta, out);
@@ -241,15 +237,14 @@ public class DefaultRecord implements Record {
      * Write the record to `out` and return its crc.
      */
     public static long writeTo(ByteBuffer out,
-                               boolean isControlRecord,
                                int offsetDelta,
                                long timestampDelta,
                                ByteBuffer key,
                                ByteBuffer value,
                                Header[] headers) {
         try {
-            return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), isControlRecord, offsetDelta,
-                    timestampDelta, key, value, headers);
+            return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta,
+                    key, value, headers);
         } catch (IOException e) {
             // cannot actually be raised by ByteBufferOutputStream
             throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e);
@@ -290,11 +285,6 @@ public class DefaultRecord implements Record {
     }
 
     @Override
-    public boolean isControlRecord() {
-        return (attributes & CONTROL_FLAG_MASK) != 0;
-    }
-
-    @Override
     public String toString() {
         return String.format("DefaultRecord(offset=%d, timestamp=%d, key=%d bytes, value=%d bytes)",
                 offset,
@@ -421,10 +411,6 @@ public class DefaultRecord implements Record {
         return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
     }
 
-    private static byte computeAttributes(boolean isControlRecord) {
-        return isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
-    }
-
     public static int sizeInBytes(int offsetDelta,
                                   long timestampDelta,
                                   byte[] key,
@@ -441,19 +427,35 @@ public class DefaultRecord implements Record {
         return bodySize + ByteUtils.sizeOfVarint(bodySize);
     }
 
+    public static int sizeInBytes(int offsetDelta,
+                                  long timestampDelta,
+                                  int keySize,
+                                  int valueSize,
+                                  Header[] headers) {
+        int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
+        return bodySize + ByteUtils.sizeOfVarint(bodySize);
+    }
+
     private static int sizeOfBodyInBytes(int offsetDelta,
                                          long timestampDelta,
                                          ByteBuffer key,
                                          ByteBuffer value,
                                          Header[] headers) {
-        int size = 1; // always one byte for attributes
-        size += ByteUtils.sizeOfVarint(offsetDelta);
-        size += ByteUtils.sizeOfVarlong(timestampDelta);
 
         int keySize = key == null ? -1 : key.remaining();
         int valueSize = value == null ? -1 : value.remaining();
-        size += sizeOf(keySize, valueSize, headers);
+        return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
+    }
 
+    private static int sizeOfBodyInBytes(int offsetDelta,
+                                         long timestampDelta,
+                                         int keySize,
+                                         int valueSize,
+                                         Header[] headers) {
+        int size = 1; // always one byte for attributes
+        size += ByteUtils.sizeOfVarint(offsetDelta);
+        size += ByteUtils.sizeOfVarlong(timestampDelta);
+        size += sizeOf(keySize, valueSize, headers);
         return size;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 93cd2eb..f321c3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -62,9 +62,9 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
  *
  * The current attributes are given below:
  *
- *  -----------------------------------------------------------------------------------
- *  | Unused (5-15) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
- *  -----------------------------------------------------------------------------------
+ *  -------------------------------------------------------------------------------------------------
+ *  | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
+ *  -------------------------------------------------------------------------------------------------
  */
 public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
     static final int BASE_OFFSET_OFFSET = 0;
@@ -98,6 +98,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
 
     private static final byte COMPRESSION_CODEC_MASK = 0x07;
     private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
+    private static final int CONTROL_FLAG_MASK = 0x20;
     private static final byte TIMESTAMP_TYPE_MASK = 0x08;
 
     private final ByteBuffer buffer;
@@ -203,6 +204,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     }
 
     @Override
+    public boolean isControlBatch() {
+        return (attributes() & CONTROL_FLAG_MASK) > 0;
+    }
+
+    @Override
     public int partitionLeaderEpoch() {
         return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
     }
@@ -284,7 +290,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
             return;
 
-        byte attributes = computeAttributes(compressionType(), timestampType, isTransactional());
+        byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
         buffer.putShort(ATTRIBUTES_OFFSET, attributes);
         buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
         long crc = computeChecksum();
@@ -330,12 +336,15 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         return buffer != null ? buffer.hashCode() : 0;
     }
 
-    private static byte computeAttributes(CompressionType type, TimestampType timestampType, boolean isTransactional) {
+    private static byte computeAttributes(CompressionType type, TimestampType timestampType,
+                                          boolean isTransactional, boolean isControl) {
         if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
             throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
                     "format v2 and above");
 
         byte attributes = isTransactional ? TRANSACTIONAL_FLAG_MASK : 0;
+        if (isControl)
+            attributes |= CONTROL_FLAG_MASK;
         if (type.id > 0)
             attributes |= COMPRESSION_CODEC_MASK & type.id;
         if (timestampType == TimestampType.LOG_APPEND_TIME)
@@ -356,6 +365,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
                             short epoch,
                             int sequence,
                             boolean isTransactional,
+                            boolean isControlBatch,
                             int partitionLeaderEpoch,
                             int numRecords) {
         if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
@@ -363,7 +373,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
             throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp);
 
-        short attributes = computeAttributes(compressionType, timestampType, isTransactional);
+        short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
 
         int position = buffer.position();
         buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
new file mode 100644
index 0000000..726b52a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents the control record which is written to the log to indicate the completion
+ * of a transaction. The record key specifies the {@link ControlRecordType control type} and the
+ * value embeds information useful for write validation (for now, just the coordinator epoch).
+ */
+public class EndTransactionMarker {
+    private static final Logger log = LoggerFactory.getLogger(EndTransactionMarker.class);
+
+    private static final short CURRENT_END_TXN_MARKER_VERSION = 0;
+    private static final Schema END_TXN_MARKER_SCHEMA_VERSION_V0 = new Schema(
+            new Field("version", Type.INT16),
+            new Field("coordinator_epoch", Type.INT32));
+    static final int CURRENT_END_TXN_MARKER_VALUE_SIZE = 6;
+    static final int CURRENT_END_TXN_SCHEMA_RECORD_SIZE = DefaultRecord.sizeInBytes(0, 0L,
+            ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
+            EndTransactionMarker.CURRENT_END_TXN_MARKER_VALUE_SIZE,
+            Record.EMPTY_HEADERS);
+
+    private final ControlRecordType type;
+    private final int coordinatorEpoch;
+
+    public EndTransactionMarker(ControlRecordType type, int coordinatorEpoch) {
+        ensureTransactionMarkerControlType(type);
+        this.type = type;
+        this.coordinatorEpoch = coordinatorEpoch;
+    }
+
+    public int coordinatorEpoch() {
+        return coordinatorEpoch;
+    }
+
+    public ControlRecordType controlType() {
+        return type;
+    }
+
+    private Struct buildRecordValue() {
+        Struct struct = new Struct(END_TXN_MARKER_SCHEMA_VERSION_V0);
+        struct.set("version", CURRENT_END_TXN_MARKER_VERSION);
+        struct.set("coordinator_epoch", coordinatorEpoch);
+        return struct;
+    }
+
+    public ByteBuffer serializeValue() {
+        Struct valueStruct = buildRecordValue();
+        ByteBuffer value = ByteBuffer.allocate(valueStruct.sizeOf());
+        valueStruct.writeTo(value);
+        value.flip();
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        EndTransactionMarker that = (EndTransactionMarker) o;
+        return coordinatorEpoch == that.coordinatorEpoch && type == that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = type != null ? type.hashCode() : 0;
+        result = 31 * result + coordinatorEpoch;
+        return result;
+    }
+
+    private static void ensureTransactionMarkerControlType(ControlRecordType type) {
+        if (type != ControlRecordType.COMMIT && type != ControlRecordType.ABORT)
+            throw new IllegalArgumentException("Invalid control record type for end transaction marker" + type);
+    }
+
+    public static EndTransactionMarker deserialize(Record record) {
+        ControlRecordType type = ControlRecordType.parse(record.key());
+        return deserializeValue(type, record.value());
+    }
+
+    static EndTransactionMarker deserializeValue(ControlRecordType type, ByteBuffer value) {
+        ensureTransactionMarkerControlType(type);
+
+        if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE)
+            throw new InvalidRecordException("Invalid value size found for end transaction marker. Must have " +
+                    "at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes, but found only " + value.remaining());
+
+        short version = value.getShort(0);
+        if (version < 0)
+            throw new InvalidRecordException("Invalid version found for end transaction marker: " + version +
+                    ". May indicate data corruption");
+
+        if (version > CURRENT_END_TXN_MARKER_VERSION)
+            log.debug("Received end transaction marker value version {}. Parsing as version {}", version,
+                    CURRENT_END_TXN_MARKER_VERSION);
+
+        int coordinatorEpoch = value.getInt(2);
+        return new EndTransactionMarker(type, coordinatorEpoch);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index d5f10dc..1af5527 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -279,6 +279,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         }
 
         @Override
+        public boolean isControlBatch() {
+            loadUnderlyingRecordBatch();
+            return underlying.isControlBatch();
+        }
+
+        @Override
         public int partitionLeaderEpoch() {
             loadUnderlyingRecordBatch();
             return underlying.partitionLeaderEpoch();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index dcd7845..16d3777 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -224,7 +224,6 @@ public class FileRecords extends AbstractRecords implements Closeable {
                     " size of this log segment is " + originalSize + " bytes.");
         if (targetSize < (int) channel.size()) {
             channel.truncate(targetSize);
-            channel.position(targetSize);
             size.set(targetSize);
         }
         return originalSize - targetSize;
@@ -276,11 +275,11 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @param targetOffset The offset to search for.
      * @param startingPosition The starting position in the file to begin searching from.
      */
-    public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
+    public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
         for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
             long offset = batch.lastOffset();
             if (offset >= targetOffset)
-                return new LogEntryPosition(offset, batch.position(), batch.sizeInBytes());
+                return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
         }
         return null;
     }
@@ -429,12 +428,12 @@ public class FileRecords extends AbstractRecords implements Closeable {
         }
     }
 
-    public static class LogEntryPosition {
+    public static class LogOffsetPosition {
         public final long offset;
         public final int position;
         public final int size;
 
-        public LogEntryPosition(long offset, int position, int size) {
+        public LogOffsetPosition(long offset, int position, int size) {
             this.offset = offset;
             this.position = position;
             this.size = size;
@@ -447,7 +446,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
             if (o == null || getClass() != o.getClass())
                 return false;
 
-            LogEntryPosition that = (LogEntryPosition) o;
+            LogOffsetPosition that = (LogOffsetPosition) o;
 
             return offset == that.offset &&
                     position == that.position &&
@@ -465,7 +464,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
 
         @Override
         public String toString() {
-            return "LogEntryPosition(" +
+            return "LogOffsetPosition(" +
                     "offset=" + offset +
                     ", position=" + position +
                     ", size=" + size +

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 548cd45..c8754c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -292,6 +292,16 @@ public class MemoryRecords extends AbstractRecords {
         return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset);
     }
 
+    public static MemoryRecordsBuilder idempotentBuilder(ByteBuffer buffer,
+                                                         CompressionType compressionType,
+                                                         long baseOffset,
+                                                         long producerId,
+                                                         short producerEpoch,
+                                                         int baseSequence) {
+        return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                baseOffset, System.currentTimeMillis(), producerId, producerEpoch, baseSequence);
+    }
+
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                                byte magic,
                                                CompressionType compressionType,
@@ -307,7 +317,8 @@ public class MemoryRecords extends AbstractRecords {
                                                long baseOffset,
                                                long logAppendTime) {
         return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
-                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -320,7 +331,8 @@ public class MemoryRecords extends AbstractRecords {
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             logAppendTime = System.currentTimeMillis();
         return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
-                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -335,6 +347,18 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               CompressionType compressionType,
+                                               long baseOffset,
+                                               long producerId,
+                                               short producerEpoch,
+                                               int baseSequence,
+                                               boolean isTransactional) {
+        return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, baseOffset,
+                RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                                byte magic,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
@@ -359,18 +383,18 @@ public class MemoryRecords extends AbstractRecords {
                                                boolean isTransactional,
                                                int partitionLeaderEpoch) {
         return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
-                logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch,
+                logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, partitionLeaderEpoch,
                 buffer.remaining());
     }
 
-
     public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, compressionType, records);
     }
 
     public static MemoryRecords withRecords(CompressionType compressionType, int partitionLeaderEpoch, SimpleRecord... records) {
-        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
-                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records);
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME,
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                partitionLeaderEpoch, false, records);
     }
 
     public static MemoryRecords withRecords(byte magic, CompressionType compressionType, SimpleRecord... records) {
@@ -378,30 +402,52 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, SimpleRecord... records) {
-        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+                records);
     }
 
     public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Integer partitionLeaderEpoch, SimpleRecord... records) {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID,
-                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records);
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records);
     }
 
-    public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
-                                            long producerId, short producerEpoch, int baseSequence,
-                                            int partitionLeaderEpoch, SimpleRecord... records) {
+    public static MemoryRecords withIdempotentRecords(byte magic, long initialOffset, CompressionType compressionType,
+                                                      long producerId, short producerEpoch, int baseSequence,
+                                                      int partitionLeaderEpoch, SimpleRecord... records) {
         return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch,
-                baseSequence, partitionLeaderEpoch, records);
+                baseSequence, partitionLeaderEpoch, false, records);
+    }
+
+    public static MemoryRecords withIdempotentRecords(long initialOffset, CompressionType compressionType, long producerId,
+                                                      short producerEpoch, int baseSequence, int partitionLeaderEpoch,
+                                                      SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+                producerId, producerEpoch, baseSequence, partitionLeaderEpoch, false, records);
+    }
+
+    public static MemoryRecords withTransactionalRecords(CompressionType compressionType, long producerId,
+                                                         short producerEpoch, int baseSequence, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME,
+                producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records);
+    }
+
+    public static MemoryRecords withTransactionalRecords(long initialOffset, CompressionType compressionType, long producerId,
+                                                         short producerEpoch, int baseSequence, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+                producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records);
     }
 
     public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
                                             TimestampType timestampType, SimpleRecord... records) {
         return withRecords(magic, initialOffset, compressionType, timestampType, RecordBatch.NO_PRODUCER_ID,
-                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH, records);
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                false, records);
     }
 
-    private static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
-                                             TimestampType timestampType, long producerId, short producerEpoch,
-                                             int baseSequence, int partitionLeaderEpoch, SimpleRecord ... records) {
+    public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
+                                            TimestampType timestampType, long producerId, short producerEpoch,
+                                            int baseSequence, int partitionLeaderEpoch, boolean isTransactional,
+                                            SimpleRecord ... records) {
         if (records.length == 0)
             return MemoryRecords.EMPTY;
         int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
@@ -409,11 +455,38 @@ public class MemoryRecords extends AbstractRecords {
         long logAppendTime = RecordBatch.NO_TIMESTAMP;
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
-                logAppendTime, producerId, producerEpoch, baseSequence, false, partitionLeaderEpoch);
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType,
+                initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false,
+                partitionLeaderEpoch, buffer.capacity());
         for (SimpleRecord record : records)
             builder.append(record);
         return builder.build();
     }
 
+    public static MemoryRecords withEndTransactionMarker(long producerId, short producerEpoch, EndTransactionMarker marker) {
+        return withEndTransactionMarker(0L, producerId, producerEpoch, marker);
+    }
+
+    public static MemoryRecords withEndTransactionMarker(long initialOffset, long producerId, short producerEpoch,
+                                                         EndTransactionMarker marker) {
+        int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
+                EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE;
+        ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize);
+        writeEndTransactionalMarker(buffer, initialOffset, producerId, producerEpoch, marker);
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+    }
+
+    public static void writeEndTransactionalMarker(ByteBuffer buffer, long initialOffset, long producerId,
+                                                   short producerEpoch, EndTransactionMarker marker) {
+        boolean isTransactional = true;
+        boolean isControlBatch = true;
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+                TimestampType.CREATE_TIME, initialOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch,
+                RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                buffer.capacity());
+        builder.appendEndTxnMarker(System.currentTimeMillis(), marker);
+        builder.close();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index b9d65a5..f7451cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -66,6 +66,7 @@ public class MemoryRecordsBuilder {
     private final long baseOffset;
     private final long logAppendTime;
     private final boolean isTransactional;
+    private final boolean isControlBatch;
     private final int partitionLeaderEpoch;
     private final int writeLimit;
     private final int initialCapacity;
@@ -112,17 +113,18 @@ public class MemoryRecordsBuilder {
                                 short producerEpoch,
                                 int baseSequence,
                                 boolean isTransactional,
+                                boolean isControlBatch,
                                 int partitionLeaderEpoch,
                                 int writeLimit) {
         if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
             throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
-
-        if (isTransactional) {
-            if (magic < RecordBatch.MAGIC_VALUE_V2)
-                throw new IllegalArgumentException("Transactional messages are not supported for magic " + magic);
+        if (magic < RecordBatch.MAGIC_VALUE_V2) {
+            if (isTransactional)
+                throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
+            if (isControlBatch)
+                throw new IllegalArgumentException("Control records are not supported for magic " + magic);
         }
 
-
         this.magic = magic;
         this.timestampType = timestampType;
         this.compressionType = compressionType;
@@ -137,6 +139,7 @@ public class MemoryRecordsBuilder {
         this.producerEpoch = producerEpoch;
         this.baseSequence = baseSequence;
         this.isTransactional = isTransactional;
+        this.isControlBatch = isControlBatch;
         this.partitionLeaderEpoch = partitionLeaderEpoch;
         this.writeLimit = writeLimit;
         this.initialCapacity = buffer.capacity();
@@ -254,7 +257,7 @@ public class MemoryRecordsBuilder {
             if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH)
                 throw new IllegalArgumentException("Invalid negative producer epoch");
 
-            if (baseSequence == RecordBatch.NO_SEQUENCE)
+            if (baseSequence < 0 && !isControlBatch)
                 throw new IllegalArgumentException("Invalid negative sequence number used");
 
             if (magic < RecordBatch.MAGIC_VALUE_V2)
@@ -298,7 +301,7 @@ public class MemoryRecordsBuilder {
         }
 
         DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
-                baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional,
+                baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
                 partitionLeaderEpoch, numRecords);
 
         buffer.position(pos);
@@ -326,26 +329,26 @@ public class MemoryRecordsBuilder {
     }
 
     private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
-                                 ByteBuffer value, Header[] headers) {
+                                  ByteBuffer value, Header[] headers) {
         try {
+            if (isControlRecord != isControlBatch)
+                throw new IllegalArgumentException("Control records can only be appended to control batches");
+
             if (lastOffset != null && offset <= lastOffset)
-                throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+                throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
+                        "(Offsets must increase monotonically).", offset, lastOffset));
 
             if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
                 throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
 
-            if (magic < RecordBatch.MAGIC_VALUE_V2) {
-                if (isControlRecord)
-                    throw new IllegalArgumentException("Magic v" + magic + " does not support control records");
-                if (headers != null && headers.length > 0)
-                    throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
-            }
+            if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
+                throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
 
             if (baseTimestamp == null)
                 baseTimestamp = timestamp;
 
             if (magic > RecordBatch.MAGIC_VALUE_V1)
-                return appendDefaultRecord(offset, isControlRecord, timestamp, key, value, headers);
+                return appendDefaultRecord(offset, timestamp, key, value, headers);
             else
                 return appendLegacyRecord(offset, timestamp, key, value);
         } catch (IOException e) {
@@ -388,7 +391,7 @@ public class MemoryRecordsBuilder {
      * @return crc of the record
      */
     public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
-        return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+        return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
     }
 
     /**
@@ -400,7 +403,7 @@ public class MemoryRecordsBuilder {
      * @return crc of the record
      */
     public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
-        return appendWithOffset(offset, false, timestamp, key, value, Record.EMPTY_HEADERS);
+        return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
     /**
@@ -410,7 +413,7 @@ public class MemoryRecordsBuilder {
      * @return crc of the record
      */
     public long appendWithOffset(long offset, SimpleRecord record) {
-        return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value(), record.headers());
+        return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
     }
 
 
@@ -434,7 +437,7 @@ public class MemoryRecordsBuilder {
      * @return crc of the record
      */
     public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
-        return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, headers);
+        return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
     }
 
     /**
@@ -476,7 +479,7 @@ public class MemoryRecordsBuilder {
      * @param value The control record value
      * @return crc of the record
      */
-    public long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+    private long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
         Struct keyStruct = type.recordKey();
         ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
         keyStruct.writeTo(key);
@@ -484,6 +487,15 @@ public class MemoryRecordsBuilder {
         return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
+    public long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
+        if (producerId == RecordBatch.NO_PRODUCER_ID)
+            throw new IllegalArgumentException("End transaction marker requires a valid producerId");
+        if (!isTransactional)
+            throw new IllegalArgumentException("End transaction marker depends on batch transactional flag being enabled");
+        ByteBuffer value = marker.serializeValue();
+        return appendControlRecord(timestamp, marker.controlType(), value);
+    }
+
     /**
      * Add a legacy record without doing offset/magic validation (this should only be used in testing).
      * @param offset The offset of the record
@@ -509,8 +521,7 @@ public class MemoryRecordsBuilder {
      * @param record the record to add
      */
     public void append(Record record) {
-        appendWithOffset(record.offset(), record.isControlRecord(), record.timestamp(), record.key(), record.value(),
-                record.headers());
+        appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value(), record.headers());
     }
 
     /**
@@ -519,8 +530,7 @@ public class MemoryRecordsBuilder {
      * @param record The record to add
      */
     public void appendWithOffset(long offset, Record record) {
-        appendWithOffset(offset, record.isControlRecord(), record.timestamp(), record.key(), record.value(),
-                record.headers());
+        appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
     }
 
     /**
@@ -542,12 +552,12 @@ public class MemoryRecordsBuilder {
         appendWithOffset(nextSequentialOffset(), record);
     }
 
-    private long appendDefaultRecord(long offset, boolean isControlRecord, long timestamp,
-                                     ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException {
+    private long appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
+                                     Header[] headers) throws IOException {
         ensureOpenForRecordAppend();
         int offsetDelta = (int) (offset - baseOffset);
         long timestampDelta = timestamp - baseTimestamp;
-        long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers);
+        long crc = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
         // TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size?
         recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers));
         return crc;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index fdf41b3..cba6fc5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -133,15 +133,6 @@ public interface Record {
     boolean hasTimestampType(TimestampType timestampType);
 
     /**
-     * Check whether this is a control record (i.e. whether the control bit is set in the record attributes).
-     * For magic versions prior to 2, this is always false.
-     *
-     * @return Whether this is a control record
-     */
-    boolean isControlRecord();
-
-
-    /**
      * Get the headers. For magic versions 1 and below, this always returns an empty array.
      *
      * @return the array of headers

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 4fd03e1..c984deb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -216,4 +216,12 @@ public interface RecordBatch extends Iterable<Record> {
      * @return The closeable iterator
      */
     CloseableIterator<Record> streamingIterator();
+
+    /**
+     * Check whether this is a control batch (i.e. whether the control bit is set in the batch attributes).
+     * For magic versions prior to 2, this is always false.
+     *
+     * @return Whether this is a batch containing control records
+     */
+    boolean isControlBatch();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 7dbffd1..03f6ee5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -40,6 +40,7 @@ public class ListOffsetRequest extends AbstractRequest {
     public static final int DEBUGGING_REPLICA_ID = -2;
 
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
+    private static final String ISOLATION_LEVEL_KEY_NAME = "isolation_level";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level field names
@@ -52,6 +53,7 @@ public class ListOffsetRequest extends AbstractRequest {
     private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
 
     private final int replicaId;
+    private final IsolationLevel isolationLevel;
     private final Map<TopicPartition, PartitionData> offsetData;
     private final Map<TopicPartition, Long> partitionTimestamps;
     private final Set<TopicPartition> duplicatePartitions;
@@ -59,23 +61,29 @@ public class ListOffsetRequest extends AbstractRequest {
     public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> {
         private final int replicaId;
         private final short minVersion;
+        private final IsolationLevel isolationLevel;
         private Map<TopicPartition, PartitionData> offsetData = null;
         private Map<TopicPartition, Long> partitionTimestamps = null;
 
         public static Builder forReplica(short desiredVersion, int replicaId) {
-            return new Builder((short) 0, desiredVersion, replicaId);
+            return new Builder((short) 0, desiredVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(boolean requireTimestamp) {
+        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
             // If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK.
-            short minVersion = requireTimestamp ? (short) 1 : (short) 0;
-            return new Builder(minVersion, null, CONSUMER_REPLICA_ID);
+            short minVersion = 0;
+            if (isolationLevel == IsolationLevel.READ_COMMITTED)
+                minVersion = 2;
+            else if (requireTimestamp)
+                minVersion = 1;
+            return new Builder(minVersion, null, CONSUMER_REPLICA_ID, isolationLevel);
         }
 
-        private Builder(short minVersion, Short desiredVersion, int replicaId) {
+        private Builder(short minVersion, Short desiredVersion, int replicaId, IsolationLevel isolationLevel) {
             super(ApiKeys.LIST_OFFSETS, desiredVersion);
             this.minVersion = minVersion;
             this.replicaId = replicaId;
+            this.isolationLevel = isolationLevel;
         }
 
         public Builder setOffsetData(Map<TopicPartition, PartitionData> offsetData) {
@@ -118,7 +126,7 @@ public class ListOffsetRequest extends AbstractRequest {
                 }
             }
             Map<TopicPartition, ?> m = (version == 0) ?  offsetData : partitionTimestamps;
-            return new ListOffsetRequest(replicaId, m, version);
+            return new ListOffsetRequest(replicaId, m, isolationLevel, version);
         }
 
         @Override
@@ -165,9 +173,10 @@ public class ListOffsetRequest extends AbstractRequest {
      * Private constructor with a specified version.
      */
     @SuppressWarnings("unchecked")
-    private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, short version) {
+    private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, IsolationLevel isolationLevel, short version) {
         super(version);
         this.replicaId = replicaId;
+        this.isolationLevel = isolationLevel;
         this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null;
         this.partitionTimestamps = version >= 1 ? (Map<TopicPartition, Long>) targetTimes : null;
         this.duplicatePartitions = Collections.emptySet();
@@ -177,6 +186,9 @@ public class ListOffsetRequest extends AbstractRequest {
         super(version);
         Set<TopicPartition> duplicatePartitions = new HashSet<>();
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
+        isolationLevel = struct.hasField(ISOLATION_LEVEL_KEY_NAME) ?
+                IsolationLevel.forId(struct.getByte(ISOLATION_LEVEL_KEY_NAME)) :
+                IsolationLevel.READ_UNCOMMITTED;
         offsetData = new HashMap<>();
         partitionTimestamps = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
@@ -223,7 +235,6 @@ public class ListOffsetRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
             case 1:
-                return new ListOffsetResponse(responseData);
             case 2:
                 return new ListOffsetResponse(throttleTimeMs, responseData);
             default:
@@ -236,6 +247,10 @@ public class ListOffsetRequest extends AbstractRequest {
         return replicaId;
     }
 
+    public IsolationLevel isolationLevel() {
+        return isolationLevel;
+    }
+
     @Deprecated
     public Map<TopicPartition, PartitionData> offsetData() {
         return offsetData;
@@ -262,6 +277,9 @@ public class ListOffsetRequest extends AbstractRequest {
         Map<String, Map<Integer, Object>> topicsData = CollectionUtils.groupDataByTopic(targetTimes);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
+
+        if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
+            struct.set(ISOLATION_LEVEL_KEY_NAME, isolationLevel.id());
         List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index cb0ff89..24ee788 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -762,6 +762,11 @@ public class Utils {
         } while (bytesRead != -1 && destinationBuffer.hasRemaining());
     }
 
+    public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException {
+        while (sourceBuffer.hasRemaining())
+            channel.write(sourceBuffer);
+    }
+
     /**
      * Write the contents of a buffer to an output stream. The bytes are copied from the current position
      * in the buffer.