You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/06 18:51:14 UTC

[5/6] kafka git commit: KAFKA-5121; Implement transaction index for KIP-98

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index e38e583..0a0f3d9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
@@ -181,16 +182,18 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
+        long producerId = 1;
+        short epoch = 0;
+        int baseSequence = 0;
+
         ByteBuffer buffer = ByteBuffer.allocate(1024);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(0L, "key".getBytes(), null);
-        builder.appendControlRecord(0L, ControlRecordType.COMMIT, null);
+        MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder(buffer, CompressionType.NONE, 0L, producerId,
+                epoch, baseSequence);
         builder.append(0L, "key".getBytes(), null);
         builder.close();
 
-        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 3L);
-        builder.appendControlRecord(0L, ControlRecordType.ABORT, null);
-        builder.close();
+        MemoryRecords.writeEndTransactionalMarker(buffer, 1L, producerId, epoch, new EndTransactionMarker(ControlRecordType.ABORT, 0)
+        );
 
         buffer.flip();
 
@@ -202,10 +205,11 @@ public class FetcherTest {
         assertTrue(partitionRecords.containsKey(tp1));
 
         List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
-        assertEquals(2, records.size());
-        assertEquals(4L, subscriptions.position(tp1).longValue());
-        for (ConsumerRecord<byte[], byte[]> record : records)
-            assertArrayEquals("key".getBytes(), record.key());
+        assertEquals(1, records.size());
+        assertEquals(2L, subscriptions.position(tp1).longValue());
+
+        ConsumerRecord<byte[], byte[]> record = records.get(0);
+        assertArrayEquals("key".getBytes(), record.key());
     }
 
     @Test
@@ -814,6 +818,29 @@ public class FetcherTest {
     }
 
     @Test
+    public void testListOffsetsSendsIsolationLevel() {
+        for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
+            Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
+                    new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
+
+            subscriptions.assignFromUser(singleton(tp1));
+            subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
+
+            client.prepareResponse(new MockClient.RequestMatcher() {
+                @Override
+                public boolean matches(AbstractRequest body) {
+                    ListOffsetRequest request = (ListOffsetRequest) body;
+                    return request.isolationLevel() == isolationLevel;
+                }
+            }, listOffsetResponse(Errors.NONE, 1L, 5L));
+            fetcher.updateFetchPositions(singleton(tp1));
+            assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+            assertTrue(subscriptions.isFetchable(tp1));
+            assertEquals(5, subscriptions.position(tp1).longValue());
+        }
+    }
+
+    @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
         subscriptions.assignFromUser(singleton(tp1));
         subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST);
@@ -1206,7 +1233,7 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
 
-        currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
 
         buffer.flip();
 
@@ -1240,7 +1267,7 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
 
-        currentOffset += commitTransaction(buffer, 1L, currentOffset, time.milliseconds());
+        currentOffset += commitTransaction(buffer, 1L, currentOffset);
         buffer.flip();
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -1278,7 +1305,7 @@ public class FetcherTest {
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
 
         int currOffset = 0;
-        // Appends for producer 1 (evetually committed)
+        // Appends for producer 1 (eventually committed)
         currOffset += appendTransactionalRecords(buffer, 1L, currOffset,
                 new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
@@ -1288,13 +1315,13 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "abort2-1".getBytes(), "value".getBytes()));
 
         // commit producer 1
-        currOffset += commitTransaction(buffer, 1L, currOffset, time.milliseconds());
+        currOffset += commitTransaction(buffer, 1L, currOffset);
         // append more for producer 2 (eventually aborted)
         currOffset += appendTransactionalRecords(buffer, 2L, currOffset,
                 new SimpleRecord(time.milliseconds(), "abort2-2".getBytes(), "value".getBytes()));
 
         // abort producer 2
-        currOffset += abortTransaction(buffer, 2L, currOffset, time.milliseconds());
+        currOffset += abortTransaction(buffer, 2L, currOffset);
         abortedTransactions.add(new FetchResponse.AbortedTransaction(2, 2));
 
         // New transaction for producer 1 (eventually aborted)
@@ -1310,11 +1337,11 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
 
         // abort producer 1
-        currOffset += abortTransaction(buffer, 1L, currOffset, time.milliseconds());
+        currOffset += abortTransaction(buffer, 1L, currOffset);
         abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 6));
 
         // commit producer 2
-        currOffset += commitTransaction(buffer, 2L, currOffset, time.milliseconds());
+        currOffset += commitTransaction(buffer, 2L, currOffset);
 
         buffer.flip();
 
@@ -1335,12 +1362,11 @@ public class FetcherTest {
         assertTrue(fetchedRecords.containsKey(tp1));
         // There are only 3 committed records
         List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp1);
-        Set<String> committedKeys = new HashSet<>(Arrays.asList("commit1-1", "commit1-2", "commit2-1"));
-        Set<String> actuallyCommittedKeys = new HashSet<>();
+        Set<String> fetchedKeys = new HashSet<>();
         for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) {
-            actuallyCommittedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
+            fetchedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
         }
-        assertTrue(actuallyCommittedKeys.equals(committedKeys));
+        assertEquals(Utils.mkSet("commit1-1", "commit1-2", "commit2-1"), fetchedKeys);
     }
 
     @Test
@@ -1354,14 +1380,14 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
 
-        currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
         // Duplicate abort -- should be ignored.
-        currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
         // Now commit a transaction.
         currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
                 new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
-        currentOffset += commitTransaction(buffer, 1L, currentOffset, time.milliseconds());
+        currentOffset += commitTransaction(buffer, 1L, currentOffset);
         buffer.flip();
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -1402,7 +1428,7 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
 
-        currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
 
         buffer.flip();
 
@@ -1436,7 +1462,7 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
 
-        currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+        currentOffset += abortTransaction(buffer, 1L, currentOffset);
         buffer.flip();
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -1463,7 +1489,8 @@ public class FetcherTest {
 
     private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord... records) {
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-                TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+                TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
 
         for (SimpleRecord record : records) {
             builder.append(record);
@@ -1472,19 +1499,15 @@ public class FetcherTest {
         return records.length;
     }
 
-    private int commitTransaction(ByteBuffer buffer, long pid, int baseOffset, long timestamp) {
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-                TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
-        builder.appendControlRecord(timestamp, ControlRecordType.COMMIT, null);
-        builder.build();
+    private int commitTransaction(ByteBuffer buffer, long producerId, int baseOffset) {
+        MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0,
+                new EndTransactionMarker(ControlRecordType.COMMIT, 0));
         return 1;
     }
 
-    private int abortTransaction(ByteBuffer buffer, long pid, long baseOffset, long timestamp) {
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-                TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
-        builder.appendControlRecord(timestamp, ControlRecordType.ABORT, null);
-        builder.build();
+    private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
+        MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0,
+                new EndTransactionMarker(ControlRecordType.ABORT, 0));
         return 1;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index 57f4663..ec858aa 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -224,25 +224,32 @@ public class DefaultRecordBatchTest {
     }
 
     @Test
-    public void testReadAndWriteControlRecord() {
+    public void testReadAndWriteControlBatch() {
+        long producerId = 1L;
+        short producerEpoch = 0;
+        int coordinatorEpoch = 15;
+
         ByteBuffer buffer = ByteBuffer.allocate(128);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-                TimestampType.CREATE_TIME, 0L);
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
+                CompressionType.NONE, TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, producerId,
+                producerEpoch, RecordBatch.NO_SEQUENCE, true, true, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                buffer.remaining());
 
-        builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.COMMIT, null);
-        builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.ABORT, null);
+        EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+        builder.appendEndTxnMarker(System.currentTimeMillis(), marker);
         MemoryRecords records = builder.build();
 
+        List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+        assertEquals(1, batches.size());
+
+        MutableRecordBatch batch = batches.get(0);
+        assertTrue(batch.isControlBatch());
+
         List<Record> logRecords = TestUtils.toList(records.records());
-        assertEquals(2, logRecords.size());
+        assertEquals(1, logRecords.size());
 
         Record commitRecord = logRecords.get(0);
-        assertTrue(commitRecord.isControlRecord());
-        assertEquals(ControlRecordType.COMMIT, ControlRecordType.parse(commitRecord.key()));
-
-        Record abortRecord = logRecords.get(1);
-        assertTrue(abortRecord.isControlRecord());
-        assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key()));
+        assertEquals(marker, EndTransactionMarker.deserialize(commitRecord));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 251db15..61b7b00 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -45,31 +44,27 @@ public class DefaultRecordTest {
             new SimpleRecord(15L, "hi".getBytes(), "there".getBytes(), headers)
         };
 
-        for (boolean isControlRecord : Arrays.asList(true, false)) {
-            for (SimpleRecord record : records) {
-                int baseSequence = 723;
-                long baseOffset = 37;
-                int offsetDelta = 10;
-                long baseTimestamp = System.currentTimeMillis();
-                long timestampDelta = 323;
+        for (SimpleRecord record : records) {
+            int baseSequence = 723;
+            long baseOffset = 37;
+            int offsetDelta = 10;
+            long baseTimestamp = System.currentTimeMillis();
+            long timestampDelta = 323;
 
-                ByteBuffer buffer = ByteBuffer.allocate(1024);
-                DefaultRecord.writeTo(buffer, isControlRecord, offsetDelta, timestampDelta, record.key(),
-                        record.value(), record.headers());
-                buffer.flip();
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value(), record.headers());
+            buffer.flip();
 
-                DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
-                assertNotNull(logRecord);
-                assertEquals(baseOffset + offsetDelta, logRecord.offset());
-                assertEquals(baseSequence + offsetDelta, logRecord.sequence());
-                assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
-                assertEquals(record.key(), logRecord.key());
-                assertEquals(record.value(), logRecord.value());
-                assertEquals(isControlRecord, logRecord.isControlRecord());
-                assertArrayEquals(record.headers(), logRecord.headers());
-                assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
-                        record.headers()), logRecord.sizeInBytes());
-            }
+            DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
+            assertNotNull(logRecord);
+            assertEquals(baseOffset + offsetDelta, logRecord.offset());
+            assertEquals(baseSequence + offsetDelta, logRecord.sequence());
+            assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
+            assertEquals(record.key(), logRecord.key());
+            assertEquals(record.value(), logRecord.value());
+            assertArrayEquals(record.headers(), logRecord.headers());
+            assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
+                    record.headers()), logRecord.sizeInBytes());
         }
     }
 
@@ -83,7 +78,7 @@ public class DefaultRecordTest {
         long timestampDelta = 323;
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
-        DefaultRecord.writeTo(buffer, false, offsetDelta, timestampDelta, key, value, new Header[0]);
+        DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value, new Header[0]);
         buffer.flip();
 
         DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
new file mode 100644
index 0000000..903f674
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class EndTransactionMarkerTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testUnknownControlTypeNotAllowed() {
+        new EndTransactionMarker(ControlRecordType.UNKNOWN, 24);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCannotDeserializeUnknownControlType() {
+        EndTransactionMarker.deserializeValue(ControlRecordType.UNKNOWN, ByteBuffer.wrap(new byte[0]));
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testIllegalNegativeVersion() {
+        ByteBuffer buffer = ByteBuffer.allocate(2);
+        buffer.putShort((short) -1);
+        buffer.flip();
+        EndTransactionMarker.deserializeValue(ControlRecordType.ABORT, buffer);
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testNotEnoughBytes() {
+        EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, ByteBuffer.wrap(new byte[0]));
+    }
+
+    @Test
+    public void testSerde() {
+        int coordinatorEpoch = 79;
+        EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+        ByteBuffer buffer = marker.serializeValue();
+        EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer);
+        assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch());
+    }
+
+    @Test
+    public void testDeserializeNewerVersion() {
+        int coordinatorEpoch = 79;
+        ByteBuffer buffer = ByteBuffer.allocate(8);
+        buffer.putShort((short) 5);
+        buffer.putInt(coordinatorEpoch);
+        buffer.putShort((short) 0); // unexpected data
+        buffer.flip();
+        EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer);
+        assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 294f2f8..11ee419 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -140,25 +140,25 @@ public class FileRecordsTest {
 
         int message1Size = batches.get(0).sizeInBytes();
         assertEquals("Should be able to find the first message by its offset",
-                new FileRecords.LogEntryPosition(0L, position, message1Size),
+                new FileRecords.LogOffsetPosition(0L, position, message1Size),
                 fileRecords.searchForOffsetWithSize(0, 0));
         position += message1Size;
 
         int message2Size = batches.get(1).sizeInBytes();
         assertEquals("Should be able to find second message when starting from 0",
-                new FileRecords.LogEntryPosition(1L, position, message2Size),
+                new FileRecords.LogOffsetPosition(1L, position, message2Size),
                 fileRecords.searchForOffsetWithSize(1, 0));
         assertEquals("Should be able to find second message starting from its offset",
-                new FileRecords.LogEntryPosition(1L, position, message2Size),
+                new FileRecords.LogOffsetPosition(1L, position, message2Size),
                 fileRecords.searchForOffsetWithSize(1, position));
         position += message2Size + batches.get(2).sizeInBytes();
 
         int message4Size = batches.get(3).sizeInBytes();
         assertEquals("Should be able to find fourth message from a non-existant offset",
-                new FileRecords.LogEntryPosition(50L, position, message4Size),
+                new FileRecords.LogOffsetPosition(50L, position, message4Size),
                 fileRecords.searchForOffsetWithSize(3, position));
         assertEquals("Should be able to find fourth message by correct offset",
-                new FileRecords.LogEntryPosition(50L, position, message4Size),
+                new FileRecords.LogOffsetPosition(50L, position, message4Size),
                 fileRecords.searchForOffsetWithSize(50,  position));
     }
 
@@ -241,7 +241,6 @@ public class FileRecordsTest {
         EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
         EasyMock.expect(channelMock.position(42L)).andReturn(null).once();
         EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once();
-        EasyMock.expect(channelMock.position(23L)).andReturn(null).once();
         EasyMock.replay(channelMock);
 
         FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 330879f..0467522 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -50,7 +50,7 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         MemoryRecords records = builder.build();
         assertEquals(0, records.sizeInBytes());
         assertEquals(bufferOffset, buffer.position());
@@ -66,8 +66,8 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
-                TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH,
-                buffer.capacity());
+                TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
         MemoryRecords records = builder.build();
 
@@ -86,7 +86,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -99,7 +99,33 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteControlBatchNotAllowedMagicV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = 2342;
+
+        new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteControlBatchNotAllowedMagicV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = 2342;
+
+        new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -112,7 +138,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.close();
     }
 
@@ -126,7 +152,7 @@ public class MemoryRecordsBuilderTest {
         int sequence = 2342;
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.close();
     }
 
@@ -140,10 +166,38 @@ public class MemoryRecordsBuilderTest {
         int sequence = RecordBatch.NO_SEQUENCE;
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.close();
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteEndTxnMarkerNonTransactionalBatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = RecordBatch.NO_SEQUENCE;
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+        builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteEndTxnMarkerNonControlBatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = RecordBatch.NO_SEQUENCE;
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+        builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0));
+    }
+
     @Test
     public void testCompressionRateV0() {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
@@ -157,7 +211,7 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         int uncompressedSize = 0;
         for (LegacyRecord record : records) {
@@ -188,7 +242,7 @@ public class MemoryRecordsBuilderTest {
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         int uncompressedSize = 0;
         for (LegacyRecord record : records) {
@@ -214,7 +268,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
-                RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(0L, "b".getBytes(), "2".getBytes());
         builder.append(0L, "c".getBytes(), "3".getBytes());
@@ -243,7 +297,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(2L, "b".getBytes(), "2".getBytes());
         builder.append(1L, "c".getBytes(), "3".getBytes());
@@ -276,7 +330,7 @@ public class MemoryRecordsBuilderTest {
         ByteBuffer buffer = ByteBuffer.allocate(512);
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
                 TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
-                RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
+                RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
 
         assertFalse(builder.isFull());
         assertTrue(builder.hasRoomFor(0L, key, value));
@@ -302,7 +356,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(1L, "b".getBytes(), "2".getBytes());
 
@@ -330,7 +384,7 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
 
@@ -346,13 +400,18 @@ public class MemoryRecordsBuilderTest {
         builder.append(10L, "1".getBytes(), "a".getBytes());
         builder.close();
 
+        MemoryRecords.writeEndTransactionalMarker(buffer, 1L, 15L, (short) 0,
+                new EndTransactionMarker(ControlRecordType.ABORT, 0));
+
         builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
                 TimestampType.CREATE_TIME, 1L);
-        builder.append(11L, "2".getBytes(), "b".getBytes());
-        builder.appendControlRecord(12L, ControlRecordType.COMMIT, null);
+        builder.append(12L, "2".getBytes(), "b".getBytes());
         builder.append(13L, "3".getBytes(), "c".getBytes());
         builder.close();
 
+        MemoryRecords.writeEndTransactionalMarker(buffer, 14L, 1L, (short) 0,
+                new EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
         buffer.flip();
 
         Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 49e1429..014a5bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -65,7 +65,7 @@ public class MemoryRecordsTest {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
 
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression,
-                TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false,
+                TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false, false,
                 partitionLeaderEpoch, buffer.limit());
 
         SimpleRecord[] records = new SimpleRecord[] {
@@ -216,9 +216,44 @@ public class MemoryRecordsTest {
     }
 
     @Test
+    public void testBuildEndTxnMarker() {
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            long producerId = 73;
+            short producerEpoch = 13;
+            long initialOffset = 983L;
+            int coordinatorEpoch = 347;
+            EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+            MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, producerId, producerEpoch, marker);
+            // verify that buffer allocation was precise
+            assertEquals(records.buffer().remaining(), records.buffer().capacity());
+
+            List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+            assertEquals(1, batches.size());
+
+            RecordBatch batch = batches.get(0);
+            assertTrue(batch.isControlBatch());
+            assertEquals(producerId, batch.producerId());
+            assertEquals(producerEpoch, batch.producerEpoch());
+            assertEquals(initialOffset, batch.baseOffset());
+            assertTrue(batch.isValid());
+
+            List<Record> createdRecords = TestUtils.toList(batch);
+            assertEquals(1, createdRecords.size());
+
+            Record record = createdRecords.get(0);
+            assertTrue(record.isValid());
+            EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize(record);
+            assertEquals(ControlRecordType.COMMIT, deserializedMarker.controlType());
+            assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
+        }
+    }
+
+    @Test
     public void testFilterToPreservesProducerInfo() {
         if (magic >= RecordBatch.MAGIC_VALUE_V2) {
             ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+            // non-idempotent, non-transactional
             MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
             builder.append(10L, null, "a".getBytes());
             builder.append(11L, "1".getBytes(), "b".getBytes());
@@ -226,17 +261,28 @@ public class MemoryRecordsTest {
 
             builder.close();
 
-            long pid = 23L;
-            short epoch = 5;
-            int baseSequence = 10;
-
+            // idempotent
+            long pid1 = 23L;
+            short epoch1 = 5;
+            int baseSequence1 = 10;
             builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L,
-                    RecordBatch.NO_TIMESTAMP, pid, epoch, baseSequence);
+                    RecordBatch.NO_TIMESTAMP, pid1, epoch1, baseSequence1);
             builder.append(13L, null, "d".getBytes());
             builder.append(14L, "4".getBytes(), "e".getBytes());
             builder.append(15L, "5".getBytes(), "f".getBytes());
             builder.close();
 
+            // transactional
+            long pid2 = 99384L;
+            short epoch2 = 234;
+            int baseSequence2 = 15;
+            builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L,
+                    RecordBatch.NO_TIMESTAMP, pid2, epoch2, baseSequence2, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            builder.append(16L, "6".getBytes(), "g".getBytes());
+            builder.append(17L, null, "h".getBytes());
+            builder.append(18L, "8".getBytes(), "i".getBytes());
+            builder.close();
+
             buffer.flip();
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
@@ -246,7 +292,7 @@ public class MemoryRecordsTest {
             MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
             List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
-            assertEquals(2, batches.size());
+            assertEquals(3, batches.size());
 
             MutableRecordBatch firstBatch = batches.get(0);
             assertEquals(1, firstBatch.countOrNull().intValue());
@@ -256,15 +302,27 @@ public class MemoryRecordsTest {
             assertEquals(RecordBatch.NO_PRODUCER_EPOCH, firstBatch.producerEpoch());
             assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.baseSequence());
             assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.lastSequence());
+            assertFalse(firstBatch.isTransactional());
 
             MutableRecordBatch secondBatch = batches.get(1);
             assertEquals(2, secondBatch.countOrNull().intValue());
             assertEquals(3L, secondBatch.baseOffset());
             assertEquals(5L, secondBatch.lastOffset());
-            assertEquals(pid, secondBatch.producerId());
-            assertEquals(epoch, secondBatch.producerEpoch());
-            assertEquals(baseSequence, secondBatch.baseSequence());
-            assertEquals(baseSequence + 2, secondBatch.lastSequence());
+            assertEquals(pid1, secondBatch.producerId());
+            assertEquals(epoch1, secondBatch.producerEpoch());
+            assertEquals(baseSequence1, secondBatch.baseSequence());
+            assertEquals(baseSequence1 + 2, secondBatch.lastSequence());
+            assertFalse(secondBatch.isTransactional());
+
+            MutableRecordBatch thirdBatch = batches.get(2);
+            assertEquals(2, thirdBatch.countOrNull().intValue());
+            assertEquals(3L, thirdBatch.baseOffset());
+            assertEquals(5L, thirdBatch.lastOffset());
+            assertEquals(pid2, thirdBatch.producerId());
+            assertEquals(epoch2, thirdBatch.producerEpoch());
+            assertEquals(baseSequence2, thirdBatch.baseSequence());
+            assertEquals(baseSequence2 + 2, thirdBatch.lastSequence());
+            assertTrue(thirdBatch.isTransactional());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c948fd1..6443e4d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -94,6 +94,9 @@ public class RequestResponseTest {
         checkRequest(createListOffsetRequest(1));
         checkErrorResponse(createListOffsetRequest(1), new UnknownServerException());
         checkResponse(createListOffsetResponse(1), 1);
+        checkRequest(createListOffsetRequest(2));
+        checkErrorResponse(createListOffsetRequest(2), new UnknownServerException());
+        checkResponse(createListOffsetResponse(2), 2);
         checkRequest(MetadataRequest.Builder.allTopics().build((short) 2));
         checkRequest(createMetadataRequest(1, asList("topic1")));
         checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
@@ -621,11 +624,24 @@ public class RequestResponseTest {
             Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
                     new TopicPartition("test", 0),
                     new ListOffsetRequest.PartitionData(1000000L, 10));
-            return ListOffsetRequest.Builder.forConsumer(false).setOffsetData(offsetData).build((short) version);
+            return ListOffsetRequest.Builder
+                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+                    .setOffsetData(offsetData)
+                    .build((short) version);
         } else if (version == 1) {
             Map<TopicPartition, Long> offsetData = Collections.singletonMap(
                     new TopicPartition("test", 0), 1000000L);
-            return ListOffsetRequest.Builder.forConsumer(true).setTargetTimes(offsetData).build((short) version);
+            return ListOffsetRequest.Builder
+                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+                    .setTargetTimes(offsetData)
+                    .build((short) version);
+        } else if (version == 2) {
+            Map<TopicPartition, Long> offsetData = Collections.singletonMap(
+                    new TopicPartition("test", 0), 1000000L);
+            return ListOffsetRequest.Builder
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
+                    .setTargetTimes(offsetData)
+                    .build((short) version);
         } else {
             throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version);
         }
@@ -638,7 +654,7 @@ public class RequestResponseTest {
             responseData.put(new TopicPartition("test", 0),
                     new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
             return new ListOffsetResponse(responseData);
-        } else if (version == 1) {
+        } else if (version == 1 || version == 2) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
             responseData.put(new TopicPartition("test", 0),
                     new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L));

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1eea8dc..1d13689 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -456,7 +456,7 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
+  def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0) = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
@@ -470,7 +470,7 @@ class Partition(val topic: String,
               .format(topicPartition, inSyncSize, minIsr))
           }
 
-          val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch)
+          val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
           // probably unblock some follower fetch requests since log end offset has been updated
           replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index a604b87..e3b1f2d 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -135,6 +135,7 @@ class Replica(val brokerId: Int,
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
       highWatermarkMetadata = newHighWatermark
+      log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
       trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")
     } else {
       throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")
@@ -143,6 +144,23 @@ class Replica(val brokerId: Int,
 
   def highWatermark = highWatermarkMetadata
 
+  /**
+   * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
+   * Non-transactional messages are considered decided immediately, but transactional messages are only decided when
+   * the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
+   * to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
+   * beyond the high watermark.
+   */
+  def lastStableOffset: LogOffsetMetadata = {
+    log.map { log =>
+      log.firstUnstableOffset match {
+        case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark.messageOffset => offsetMetadata
+        case _ => highWatermark
+      }
+    }.getOrElse(throw new KafkaException(s"Cannot fetch last stable offset on partition $topicPartition's " +
+      s"non-local replica $brokerId"))
+  }
+
   def convertHWToLocalOffsetMetadata() = {
     if (isLocal) {
       highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
@@ -165,7 +183,10 @@ class Replica(val brokerId: Int,
     replicaString.append("; Partition: " + partition.partitionId)
     replicaString.append("; isLocal: " + isLocal)
     replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
-    if (isLocal) replicaString.append("; Highwatermark: " + highWatermark)
+    if (isLocal) {
+      replicaString.append("; Highwatermark: " + highWatermark)
+      replicaString.append("; LastStableOffset: " + lastStableOffset)
+    }
     replicaString.toString
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index e711392..3eafdb7 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 
@@ -225,7 +225,8 @@ class GroupMetadataManager(brokerId: Int,
     replicaManager.appendRecords(
       config.offsetCommitTimeoutMs.toLong,
       config.offsetCommitRequiredAcks,
-      true, // allow appending to internal offset topic
+      internalTopicsAllowed = true,
+      isFromClient = false,
       delayedStore.partitionRecords,
       delayedStore.callback)
   }
@@ -429,7 +430,8 @@ class GroupMetadataManager(brokerId: Int,
 
       case Some(log) =>
         var currOffset = log.logStartOffset
-        val buffer = ByteBuffer.allocate(config.loadBufferSize)
+        lazy val buffer = ByteBuffer.allocate(config.loadBufferSize)
+
         // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
         val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
         val removedOffsets = mutable.Set[GroupTopicPartition]()
@@ -437,12 +439,18 @@ class GroupMetadataManager(brokerId: Int,
         val removedGroups = mutable.Set[String]()
 
         while (currOffset < highWaterMark && !shuttingDown.get()) {
-          buffer.clear()
-          val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true)
-            .records.asInstanceOf[FileRecords]
-          val bufferRead = fileRecords.readInto(buffer, 0)
+          val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true,
+            isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+
+          val memRecords = fetchDataInfo.records match {
+            case records: MemoryRecords => records
+            case fileRecords: FileRecords =>
+              buffer.clear()
+              val bufferRead = fileRecords.readInto(buffer, 0)
+              MemoryRecords.readableRecords(bufferRead)
+          }
 
-          MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch =>
+          memRecords.batches.asScala.foreach { batch =>
             for (record <- batch.asScala) {
               require(record.hasKey, "Group metadata/offset entry key should not be null")
               GroupMetadataManager.readMessageKey(record.key) match {
@@ -630,7 +638,8 @@ class GroupMetadataManager(brokerId: Int,
                 // do not need to require acks since even if the tombstone is lost,
                 // it will be appended again in the next purge cycle
                 val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*)
-                partition.appendRecordsToLeader(records)
+                partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0)
+
                 offsetsRemoved += removedOffsets.size
                 trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
                   s"offsets and/or metadata for group $groupId")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index f07ca91..7930cd0 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -31,6 +31,7 @@ import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 
@@ -158,8 +159,7 @@ class TransactionStateManager(brokerId: Int,
         warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
 
       case Some(log) =>
-        val buffer = ByteBuffer.allocate(config.transactionLogLoadBufferSize)
-
+        lazy val buffer = ByteBuffer.allocate(config.transactionLogLoadBufferSize)
         val loadedTransactions = mutable.Map.empty[String, TransactionMetadata]
         val removedTransactionalIds = mutable.Set.empty[String]
 
@@ -169,11 +169,17 @@ class TransactionStateManager(brokerId: Int,
                 && loadingPartitions.contains(topicPartition.partition())
                 && !shuttingDown.get()) {
           buffer.clear()
-          val fileRecords = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None, minOneMessage = true)
-            .records.asInstanceOf[FileRecords]
-          val bufferRead = fileRecords.readInto(buffer, 0)
+          val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None,
+            minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+          val memRecords = fetchDataInfo.records match {
+            case records: MemoryRecords => records
+            case fileRecords: FileRecords =>
+              buffer.clear()
+              val bufferRead = fileRecords.readInto(buffer, 0)
+              MemoryRecords.readableRecords(bufferRead)
+          }
 
-          MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch =>
+          memRecords.batches.asScala.foreach { batch =>
             for (record <- batch.asScala) {
               require(record.hasKey, "Transaction state log's key should not be null")
               TransactionLog.readMessageKey(record.key) match {
@@ -414,6 +420,7 @@ class TransactionStateManager(brokerId: Int,
       txnMetadata.txnTimeoutMs.toLong,
       TransactionLog.EnforcedRequiredAcks,
       internalTopicsAllowed = true,
+      isFromClient = false,
       recordsPerPartition,
       updateCacheCallback)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index f7478ad..a125676 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -246,14 +246,26 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
    * @param target The index key to look for
    * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
    */
-  protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
+  protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int =
+    indexSlotRangeFor(idx, target, searchEntity)._1
+
+  /**
+   * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned.
+   */
+  protected def smallestUpperBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int =
+    indexSlotRangeFor(idx, target, searchEntity)._2
+
+  /**
+   * Lookup lower and upper bounds for the given target.
+   */
+  private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
     // check if the index is empty
     if(_entries == 0)
-      return -1
+      return (-1, -1)
 
     // check if the target offset is smaller than the least offset
     if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
-      return -1
+      return (-1, 0)
 
     // binary search for the entry
     var lo = 0
@@ -267,9 +279,10 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
       else if(compareResult < 0)
         lo = mid
       else
-        return mid
+        return (mid, mid)
     }
-    lo
+
+    (lo, if (lo == _entries - 1) -1 else lo + 1)
   }
 
   private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {