You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/24 19:43:58 UTC

[05/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index e1dcae4..ef48783 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,29 +48,116 @@ public class MemoryRecordsBuilderTest {
         ByteBuffer buffer = ByteBuffer.allocate(128);
         buffer.position(bufferOffset);
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType,
-                TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
         MemoryRecords records = builder.build();
         assertEquals(0, records.sizeInBytes());
         assertEquals(bufferOffset, buffer.position());
     }
 
     @Test
+    public void testWriteTransactionalRecordSet() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = 2342;
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+                TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
+                buffer.capacity());
+        builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
+        MemoryRecords records = builder.build();
+
+        List<MutableRecordBatch> batches = Utils.toList(records.batches().iterator());
+        assertEquals(1, batches.size());
+        assertTrue(batches.get(0).isTransactional());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteTransactionalNotAllowedMagicV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = 2342;
+
+        new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteTransactionalNotAllowedMagicV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = 2342;
+
+        new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteTransactionalWithInvalidPID() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = RecordBatch.NO_PRODUCER_ID;
+        short epoch = 15;
+        int sequence = 2342;
+
+        new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteIdempotentWithInvalidEpoch() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = RecordBatch.NO_PRODUCER_EPOCH;
+        int sequence = 2342;
+
+        new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWriteIdempotentWithInvalidBaseSequence() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        long pid = 9809;
+        short epoch = 15;
+        int sequence = RecordBatch.NO_SEQUENCE;
+
+        new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+    }
+
+    @Test
     public void testCompressionRateV0() {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.position(bufferOffset);
 
-        Record[] records = new Record[] {
-                Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
-                Record.create(Record.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
-                Record.create(Record.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
+        LegacyRecord[] records = new LegacyRecord[] {
+                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
+                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
+                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
         };
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType,
-                TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         int uncompressedSize = 0;
-        for (Record record : records) {
+        for (LegacyRecord record : records) {
             uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
             builder.append(record);
         }
@@ -78,7 +166,7 @@ public class MemoryRecordsBuilderTest {
         if (compressionType == CompressionType.NONE) {
             assertEquals(1.0, builder.compressionRate(), 0.00001);
         } else {
-            int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V0;
+            int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
             double computedCompressionRate = (double) compressedSize / uncompressedSize;
             assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
         }
@@ -89,17 +177,18 @@ public class MemoryRecordsBuilderTest {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.position(bufferOffset);
 
-        Record[] records = new Record[] {
-                Record.create(Record.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
-                Record.create(Record.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
-                Record.create(Record.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
+        LegacyRecord[] records = new LegacyRecord[] {
+                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
+                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
+                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
         };
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         int uncompressedSize = 0;
-        for (Record record : records) {
+        for (LegacyRecord record : records) {
             uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
             builder.append(record);
         }
@@ -108,7 +197,7 @@ public class MemoryRecordsBuilderTest {
         if (compressionType == CompressionType.NONE) {
             assertEquals(1.0, builder.compressionRate(), 0.00001);
         } else {
-            int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V1;
+            int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V1;
             double computedCompressionRate = (double) compressedSize / uncompressedSize;
             assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
         }
@@ -120,8 +209,9 @@ public class MemoryRecordsBuilderTest {
         buffer.position(bufferOffset);
 
         long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
+                RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(0L, "b".getBytes(), "2".getBytes());
         builder.append(0L, "c".getBytes(), "3".getBytes());
@@ -132,33 +222,10 @@ public class MemoryRecordsBuilderTest {
 
         assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
-        for (Record record : records.records()) {
-            assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
-            assertEquals(logAppendTime, record.timestamp());
-        }
-    }
-
-    @Test
-    public void convertUsingLogAppendTime() {
-        ByteBuffer buffer = ByteBuffer.allocate(1024);
-        buffer.position(bufferOffset);
-
-        long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
-
-        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
-        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
-        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
-        MemoryRecords records = builder.build();
-
-        MemoryRecordsBuilder.RecordsInfo info = builder.info();
-        assertEquals(logAppendTime, info.maxTimestamp);
-        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
-
-        for (Record record : records.records()) {
-            assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
-            assertEquals(logAppendTime, record.timestamp());
+        for (RecordBatch batch : records.batches()) {
+            assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+            for (Record record : batch)
+                assertEquals(logAppendTime, record.timestamp());
         }
     }
 
@@ -168,8 +235,9 @@ public class MemoryRecordsBuilderTest {
         buffer.position(bufferOffset);
 
         long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(2L, "b".getBytes(), "2".getBytes());
         builder.append(1L, "c".getBytes(), "3".getBytes());
@@ -185,9 +253,10 @@ public class MemoryRecordsBuilderTest {
 
         int i = 0;
         long[] expectedTimestamps = new long[] {0L, 2L, 1L};
-        for (Record record : records.records()) {
-            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
-            assertEquals(expectedTimestamps[i++], record.timestamp());
+        for (RecordBatch batch : records.batches()) {
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            for (Record record : batch)
+                assertEquals(expectedTimestamps[i++], record.timestamp());
         }
     }
 
@@ -199,15 +268,16 @@ public class MemoryRecordsBuilderTest {
         byte[] value = "bar".getBytes();
         int writeLimit = 0;
         ByteBuffer buffer = ByteBuffer.allocate(512);
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType,
-                TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, writeLimit);
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+                TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
+                RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
 
         assertFalse(builder.isFull());
-        assertTrue(builder.hasRoomFor(key, value));
+        assertTrue(builder.hasRoomFor(0L, key, value));
         builder.append(0L, key, value);
 
         assertTrue(builder.isFull());
-        assertFalse(builder.hasRoomFor(key, value));
+        assertFalse(builder.hasRoomFor(0L, key, value));
 
         MemoryRecords memRecords = builder.build();
         List<Record> records = TestUtils.toList(memRecords.records());
@@ -224,12 +294,13 @@ public class MemoryRecordsBuilderTest {
         buffer.position(bufferOffset);
 
         long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
         builder.append(1L, "b".getBytes(), "2".getBytes());
 
-        assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes()));
+        assertFalse(builder.hasRoomFor(2L, "c".getBytes(), "3".getBytes()));
         builder.append(2L, "c".getBytes(), "3".getBytes());
         MemoryRecords records = builder.build();
 
@@ -238,9 +309,10 @@ public class MemoryRecordsBuilderTest {
         assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
         long i = 0L;
-        for (Record record : records.records()) {
-            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
-            assertEquals(i++, record.timestamp());
+        for (RecordBatch batch : records.batches()) {
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            for (Record record : batch)
+                assertEquals(i++, record.timestamp());
         }
     }
 
@@ -250,8 +322,9 @@ public class MemoryRecordsBuilderTest {
         buffer.position(bufferOffset);
 
         long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+                false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
 
         builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
 
@@ -259,40 +332,78 @@ public class MemoryRecordsBuilderTest {
         builder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), null);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAppendWithInvalidMagic() {
-        ByteBuffer buffer = ByteBuffer.allocate(1024);
-        buffer.position(bufferOffset);
-
-        long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+    @Test
+    public void convertV2ToV1UsingMixedCreateAndLogAppendTime() {
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
+                compressionType, TimestampType.LOG_APPEND_TIME, 0L);
+        builder.append(10L, "1".getBytes(), "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
+                TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "2".getBytes(), "b".getBytes());
+        builder.appendControlRecord(12L, ControlRecordType.COMMIT, null);
+        builder.append(13L, "3".getBytes(), "c".getBytes());
+        builder.close();
+
+        buffer.flip();
+
+        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+
+        List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
+        if (compressionType != CompressionType.NONE) {
+            assertEquals(2, batches.size());
+            assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
+            assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
+        } else {
+            assertEquals(3, batches.size());
+            assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
+            assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
+            assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType());
+        }
 
-        builder.append(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), null));
+        List<Record> logRecords = Utils.toList(records.records().iterator());
+        assertEquals(3, logRecords.size());
+        assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
+        assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
+        assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
     }
 
     @Test
-    public void convertUsingCreateTime() {
-        ByteBuffer buffer = ByteBuffer.allocate(1024);
-        buffer.position(bufferOffset);
-
-        long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
-                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
-
-        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
-        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
-        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
-        MemoryRecords records = builder.build();
-
-        MemoryRecordsBuilder.RecordsInfo info = builder.info();
-        assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
-        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
-
-        for (Record record : records.records()) {
-            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
-            assertEquals(Record.NO_TIMESTAMP, record.timestamp());
+    public void convertToV1WithMixedV0AndV2Data() {
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
+                compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L);
+        builder.append(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
+                TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "2".getBytes(), "b".getBytes());
+        builder.append(12L, "3".getBytes(), "c".getBytes());
+        builder.close();
+
+        buffer.flip();
+
+        Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+
+        List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
+        if (compressionType != CompressionType.NONE) {
+            assertEquals(2, batches.size());
+            assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+        } else {
+            assertEquals(3, batches.size());
+            assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(2).magic());
         }
+
+        List<Record> logRecords = Utils.toList(records.records().iterator());
+        assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
+        assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
+        assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
     }
 
     @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index bfe0a57..8cead03 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -24,13 +25,12 @@ import org.junit.runners.Parameterized;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 
 import static java.util.Arrays.asList;
-import static org.apache.kafka.common.utils.Utils.toNullableArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
@@ -39,57 +39,106 @@ public class MemoryRecordsTest {
     private CompressionType compression;
     private byte magic;
     private long firstOffset;
+    private long pid;
+    private short epoch;
+    private int firstSequence;
+    private long logAppendTime = System.currentTimeMillis();
+    private int partitionLeaderEpoch = 998;
 
     public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compression) {
         this.magic = magic;
         this.compression = compression;
         this.firstOffset = firstOffset;
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            pid = 134234L;
+            epoch = 28;
+            firstSequence = 777;
+        } else {
+            pid = RecordBatch.NO_PRODUCER_ID;
+            epoch = RecordBatch.NO_PRODUCER_EPOCH;
+            firstSequence = RecordBatch.NO_SEQUENCE;
+        }
     }
 
     @Test
     public void testIterator() {
-        MemoryRecordsBuilder builder1 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
-        MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
-        List<Record> list = asList(
-                Record.create(magic, 1L, "a".getBytes(), "1".getBytes()),
-                Record.create(magic, 2L, "b".getBytes(), "2".getBytes()),
-                Record.create(magic, 3L, "c".getBytes(), "3".getBytes()),
-                Record.create(magic, 4L, null, "4".getBytes()),
-                Record.create(magic, 5L, "e".getBytes(), null),
-                Record.create(magic, 6L, null, null));
-
-        for (int i = 0; i < list.size(); i++) {
-            Record r = list.get(i);
-            builder1.append(r);
-            builder2.append(i + 1, toNullableArray(r.key()), toNullableArray(r.value()));
-        }
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression,
+                TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false,
+                partitionLeaderEpoch, buffer.limit());
 
-        MemoryRecords recs1 = builder1.build();
-        MemoryRecords recs2 = builder2.build();
+        SimpleRecord[] records = new SimpleRecord[] {
+            new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+            new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+            new SimpleRecord(3L, "c".getBytes(), "3".getBytes()),
+            new SimpleRecord(4L, null, "4".getBytes()),
+            new SimpleRecord(5L, "d".getBytes(), null),
+            new SimpleRecord(6L, (byte[]) null, null)
+        };
 
+        for (SimpleRecord record : records)
+            builder.append(record);
+
+        MemoryRecords memoryRecords = builder.build();
         for (int iteration = 0; iteration < 2; iteration++) {
-            for (MemoryRecords recs : asList(recs1, recs2)) {
-                Iterator<LogEntry> iter = recs.deepEntries().iterator();
-                for (int i = 0; i < list.size(); i++) {
-                    assertTrue(iter.hasNext());
-                    LogEntry entry = iter.next();
-                    assertEquals(firstOffset + i, entry.offset());
-                    assertEquals(list.get(i), entry.record());
-                    entry.record().ensureValid();
+            int total = 0;
+            for (RecordBatch batch : memoryRecords.batches()) {
+                assertTrue(batch.isValid());
+                assertEquals(compression, batch.compressionType());
+                assertEquals(firstOffset + total, batch.baseOffset());
+
+                if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+                    assertEquals(pid, batch.producerId());
+                    assertEquals(epoch, batch.producerEpoch());
+                    assertEquals(firstSequence + total, batch.baseSequence());
+                    assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+                    assertEquals(records.length, batch.countOrNull().intValue());
+                    assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+                    assertEquals(records[records.length - 1].timestamp(), batch.maxTimestamp());
+                } else {
+                    assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+                    assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+                    assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+                    assertEquals(RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, batch.partitionLeaderEpoch());
+                    assertNull(batch.countOrNull());
+                }
+
+                int recordCount = 0;
+                for (Record record : batch) {
+                    assertTrue(record.isValid());
+                    assertTrue(record.hasMagic(batch.magic()));
+                    assertFalse(record.isCompressed());
+                    assertEquals(firstOffset + total, record.offset());
+                    assertEquals(records[total].key(), record.key());
+                    assertEquals(records[total].value(), record.value());
+
+                    if (magic >= RecordBatch.MAGIC_VALUE_V2)
+                        assertEquals(firstSequence + total, record.sequence());
+
+                    if (magic > RecordBatch.MAGIC_VALUE_V0) {
+                        assertEquals(records[total].timestamp(), record.timestamp());
+                        if (magic < RecordBatch.MAGIC_VALUE_V2)
+                            assertTrue(record.hasTimestampType(batch.timestampType()));
+                    }
+
+                    total++;
+                    recordCount++;
                 }
-                assertFalse(iter.hasNext());
+
+                assertEquals(batch.baseOffset() + recordCount - 1, batch.lastOffset());
             }
         }
     }
 
     @Test
     public void testHasRoomForMethod() {
-        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME);
-        builder.append(Record.create(magic, 0L, "a".getBytes(), "1".getBytes()));
-
-        assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression,
+                TimestampType.CREATE_TIME, 0L);
+        builder.append(0L, "a".getBytes(), "1".getBytes());
+        assertTrue(builder.hasRoomFor(1L, "b".getBytes(), "2".getBytes()));
         builder.close();
-        assertFalse(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+        assertFalse(builder.hasRoomFor(1L, "b".getBytes(), "2".getBytes()));
     }
 
     @Test
@@ -135,37 +184,72 @@ public class MemoryRecordsTest {
 
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
-        List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
-        assertEquals(expectedOffsets.size(), shallowEntries.size());
-
-        for (int i = 0; i < expectedOffsets.size(); i++) {
-            LogEntry shallowEntry = shallowEntries.get(i);
-            assertEquals(expectedOffsets.get(i).longValue(), shallowEntry.offset());
-            assertEquals(magic, shallowEntry.record().magic());
-            assertEquals(compression, shallowEntry.record().compressionType());
-            assertEquals(magic == Record.MAGIC_VALUE_V0 ? TimestampType.NO_TIMESTAMP_TYPE : TimestampType.CREATE_TIME,
-                    shallowEntry.record().timestampType());
+        List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+        final List<Long> expectedEndOffsets;
+        final List<Long> expectedStartOffsets;
+        final List<Long> expectedMaxTimestamps;
+
+        if (magic < RecordBatch.MAGIC_VALUE_V2 && compression == CompressionType.NONE) {
+            expectedEndOffsets = asList(1L, 4L, 5L, 6L);
+            expectedStartOffsets = asList(1L, 4L, 5L, 6L);
+            expectedMaxTimestamps = asList(11L, 20L, 15L, 16L);
+        } else if (magic < RecordBatch.MAGIC_VALUE_V2) {
+            expectedEndOffsets = asList(1L, 5L, 6L);
+            expectedStartOffsets = asList(1L, 4L, 6L);
+            expectedMaxTimestamps = asList(11L, 20L, 16L);
+        } else {
+            expectedEndOffsets = asList(1L, 5L, 6L);
+            expectedStartOffsets = asList(1L, 3L, 6L);
+            expectedMaxTimestamps = asList(11L, 20L, 16L);
+        }
+
+        assertEquals(expectedEndOffsets.size(), batches.size());
+
+        for (int i = 0; i < expectedEndOffsets.size(); i++) {
+            RecordBatch batch = batches.get(i);
+            assertEquals(expectedStartOffsets.get(i).longValue(), batch.baseOffset());
+            assertEquals(expectedEndOffsets.get(i).longValue(), batch.lastOffset());
+            assertEquals(magic, batch.magic());
+            assertEquals(compression, batch.compressionType());
+            if (magic >= RecordBatch.MAGIC_VALUE_V1) {
+                assertEquals(expectedMaxTimestamps.get(i).longValue(), batch.maxTimestamp());
+                assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            } else {
+                assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp());
+                assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
+            }
         }
 
-        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
-        assertEquals(4, deepEntries.size());
+        List<Record> records = TestUtils.toList(filteredRecords.records());
+        assertEquals(4, records.size());
 
-        LogEntry first = deepEntries.get(0);
+        Record first = records.get(0);
         assertEquals(1L, first.offset());
-        assertEquals(Record.create(magic, 11L, "1".getBytes(), "b".getBytes()), first.record());
+        if (magic > RecordBatch.MAGIC_VALUE_V0)
+            assertEquals(11L, first.timestamp());
+        assertEquals("1", Utils.utf8(first.key(), first.keySize()));
+        assertEquals("b", Utils.utf8(first.value(), first.valueSize()));
 
-        LogEntry second = deepEntries.get(1);
+        Record second = records.get(1);
         assertEquals(4L, second.offset());
-        assertEquals(Record.create(magic, 20L, "4".getBytes(), "e".getBytes()), second.record());
+        if (magic > RecordBatch.MAGIC_VALUE_V0)
+            assertEquals(20L, second.timestamp());
+        assertEquals("4", Utils.utf8(second.key(), second.keySize()));
+        assertEquals("e", Utils.utf8(second.value(), second.valueSize()));
 
-        LogEntry third = deepEntries.get(2);
+        Record third = records.get(2);
         assertEquals(5L, third.offset());
-        assertEquals(Record.create(magic, 15L, "5".getBytes(), "f".getBytes()), third.record());
+        if (magic > RecordBatch.MAGIC_VALUE_V0)
+            assertEquals(15L, third.timestamp());
+        assertEquals("5", Utils.utf8(third.key(), third.keySize()));
+        assertEquals("f", Utils.utf8(third.value(), third.valueSize()));
 
-        LogEntry fourth = deepEntries.get(3);
+        Record fourth = records.get(3);
         assertEquals(6L, fourth.offset());
-        assertEquals(Record.create(magic, 16L, "6".getBytes(), "g".getBytes()), fourth.record());
+        if (magic > RecordBatch.MAGIC_VALUE_V0)
+            assertEquals(16L, fourth.timestamp());
+        assertEquals("6", Utils.utf8(fourth.key(), fourth.keySize()));
+        assertEquals("g", Utils.utf8(fourth.value(), fourth.valueSize()));
     }
 
     @Test
@@ -174,16 +258,18 @@ public class MemoryRecordsTest {
 
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
-                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime);
+                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch, firstSequence);
         builder.append(10L, null, "a".getBytes());
         builder.close();
 
-        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime);
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime,
+                pid, epoch, firstSequence);
         builder.append(11L, "1".getBytes(), "b".getBytes());
         builder.append(12L, null, "c".getBytes());
         builder.close();
 
-        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime);
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime,
+                pid, epoch, firstSequence);
         builder.append(13L, null, "d".getBytes());
         builder.append(14L, "4".getBytes(), "e".getBytes());
         builder.append(15L, "5".getBytes(), "f".getBytes());
@@ -197,14 +283,14 @@ public class MemoryRecordsTest {
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
-        assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
+        List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+        assertEquals(magic < RecordBatch.MAGIC_VALUE_V2 && compression == CompressionType.NONE ? 3 : 2, batches.size());
 
-        for (LogEntry shallowEntry : shallowEntries) {
-            assertEquals(compression, shallowEntry.record().compressionType());
-            if (magic > Record.MAGIC_VALUE_V0) {
-                assertEquals(TimestampType.LOG_APPEND_TIME, shallowEntry.record().timestampType());
-                assertEquals(logAppendTime, shallowEntry.record().timestamp());
+        for (RecordBatch batch : batches) {
+            assertEquals(compression, batch.compressionType());
+            if (magic > RecordBatch.MAGIC_VALUE_V0) {
+                assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+                assertEquals(logAppendTime, batch.maxTimestamp());
             }
         }
     }
@@ -213,16 +299,16 @@ public class MemoryRecordsTest {
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();
         for (long firstOffset : asList(0L, 57L))
-            for (byte magic : asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
+            for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2))
                 for (CompressionType type: CompressionType.values())
                     values.add(new Object[] {magic, firstOffset, type});
         return values;
     }
 
-    private static class RetainNonNullKeysFilter implements MemoryRecords.LogEntryFilter {
+    private static class RetainNonNullKeysFilter implements MemoryRecords.RecordFilter {
         @Override
-        public boolean shouldRetain(LogEntry entry) {
-            return entry.record().hasKey();
+        public boolean shouldRetain(Record record) {
+            return record.hasKey();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
deleted file mode 100644
index daf8a87..0000000
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(value = Parameterized.class)
-public class RecordTest {
-
-    private final byte magic;
-    private final long timestamp;
-    private final ByteBuffer key;
-    private final ByteBuffer value;
-    private final CompressionType compression;
-    private final TimestampType timestampType;
-    private final Record record;
-
-    public RecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) {
-        this.magic = magic;
-        this.timestamp = timestamp;
-        this.timestampType = TimestampType.CREATE_TIME;
-        this.key = key == null ? null : ByteBuffer.wrap(key);
-        this.value = value == null ? null : ByteBuffer.wrap(value);
-        this.compression = compression;
-        this.record = Record.create(magic, timestamp, key, value, compression, timestampType);
-    }
-
-    @Test
-    public void testFields() {
-        assertEquals(compression, record.compressionType());
-        assertEquals(key != null, record.hasKey());
-        assertEquals(key, record.key());
-        if (key != null)
-            assertEquals(key.limit(), record.keySize());
-        assertEquals(magic, record.magic());
-        assertEquals(value, record.value());
-        if (value != null)
-            assertEquals(value.limit(), record.valueSize());
-        if (magic > 0) {
-            assertEquals(timestamp, record.timestamp());
-            assertEquals(timestampType, record.timestampType());
-        } else {
-            assertEquals(Record.NO_TIMESTAMP, record.timestamp());
-            assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
-        }
-    }
-
-    @Test
-    public void testChecksum() {
-        assertEquals(record.checksum(), record.computeChecksum());
-
-        byte attributes = Record.computeAttributes(magic, this.compression, TimestampType.CREATE_TIME);
-        assertEquals(record.checksum(), Record.computeChecksum(
-                magic,
-                attributes,
-                this.timestamp,
-                this.key == null ? null : this.key.array(),
-                this.value == null ? null : this.value.array()
-        ));
-        assertTrue(record.isValid());
-        for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.sizeInBytes(); i++) {
-            Record copy = copyOf(record);
-            copy.buffer().put(i, (byte) 69);
-            assertFalse(copy.isValid());
-            try {
-                copy.ensureValid();
-                fail("Should fail the above test.");
-            } catch (InvalidRecordException e) {
-                // this is good
-            }
-        }
-    }
-
-    private Record copyOf(Record record) {
-        ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes());
-        record.buffer().put(buffer);
-        buffer.rewind();
-        record.buffer().rewind();
-        return new Record(buffer);
-    }
-
-    @Test
-    public void testEquality() {
-        assertEquals(record, copyOf(record));
-    }
-
-    @Parameters
-    public static Collection<Object[]> data() {
-        byte[] payload = new byte[1000];
-        Arrays.fill(payload, (byte) 1);
-        List<Object[]> values = new ArrayList<>();
-        for (byte magic : Arrays.asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
-            for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L))
-                for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
-                    for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
-                        for (CompressionType compression : CompressionType.values())
-                            values.add(new Object[] {magic, timestamp, key, value, compression});
-        return values;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
new file mode 100644
index 0000000..dd718bf
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SimpleLegacyRecordTest {
+
+    @Test(expected = InvalidRecordException.class)
+    public void testCompressedIterationWithNullValue() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
+        AbstractLegacyRecordBatch.writeHeader(out, 0L, LegacyRecord.RECORD_OVERHEAD_V1);
+        LegacyRecord.write(out, RecordBatch.MAGIC_VALUE_V1, 1L, (byte[]) null, null,
+                CompressionType.GZIP, TimestampType.CREATE_TIME);
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        for (Record record : records.records())
+            fail("Iteration should have caused invalid record error");
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testCompressedIterationWithEmptyRecords() throws Exception {
+        ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64);
+        OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue),
+                RecordBatch.MAGIC_VALUE_V1, 64);
+        gzipOutput.close();
+        emptyCompressedValue.flip();
+
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
+        AbstractLegacyRecordBatch.writeHeader(out, 0L, LegacyRecord.RECORD_OVERHEAD_V1 + emptyCompressedValue.remaining());
+        LegacyRecord.write(out, RecordBatch.MAGIC_VALUE_V1, 1L, null, Utils.toArray(emptyCompressedValue),
+                CompressionType.GZIP, TimestampType.CREATE_TIME);
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        for (Record record : records.records())
+            fail("Iteration should have caused invalid record error");
+    }
+
+    /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */
+    @Test(expected = InvalidRecordException.class)
+    public void testIsValidWithTooSmallBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(2);
+        LegacyRecord record = new LegacyRecord(buffer);
+        assertFalse(record.isValid());
+        record.ensureValid();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testIsValidWithChecksumMismatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        // set checksum
+        buffer.putInt(2);
+        LegacyRecord record = new LegacyRecord(buffer);
+        assertFalse(record.isValid());
+        record.ensureValid();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
deleted file mode 100644
index aa77ca4..0000000
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.apache.kafka.common.utils.Utils;
-import org.junit.Test;
-
-import java.io.DataOutputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class SimpleRecordTest {
-
-    @Test(expected = InvalidRecordException.class)
-    public void testCompressedIterationWithNullValue() throws Exception {
-        ByteBuffer buffer = ByteBuffer.allocate(128);
-        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
-        LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1);
-        Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, null, CompressionType.GZIP, TimestampType.CREATE_TIME);
-
-        buffer.flip();
-
-        MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        for (Record record : records.records())
-            fail("Iteration should have caused invalid record error");
-    }
-
-    @Test(expected = InvalidRecordException.class)
-    public void testCompressedIterationWithEmptyRecords() throws Exception {
-        ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64);
-        OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue),
-                Record.MAGIC_VALUE_V1, 64);
-        gzipOutput.close();
-        emptyCompressedValue.flip();
-
-        ByteBuffer buffer = ByteBuffer.allocate(128);
-        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
-        LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1 + emptyCompressedValue.remaining());
-        Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, Utils.toArray(emptyCompressedValue),
-                CompressionType.GZIP, TimestampType.CREATE_TIME);
-
-        buffer.flip();
-
-        MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        for (Record record : records.records())
-            fail("Iteration should have caused invalid record error");
-    }
-
-    /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */
-    @Test(expected = InvalidRecordException.class)
-    public void testIsValidWithTooSmallBuffer() {
-        ByteBuffer buffer = ByteBuffer.allocate(2);
-        Record record = new Record(buffer);
-        assertFalse(record.isValid());
-        record.ensureValid();
-    }
-
-    @Test(expected = InvalidRecordException.class)
-    public void testIsValidWithChecksumMismatch() {
-        ByteBuffer buffer = ByteBuffer.allocate(4);
-        // set checksum
-        buffer.putInt(2);
-        Record record = new Record(buffer);
-        assertFalse(record.isValid());
-        record.ensureValid();
-    }
-
-    @Test
-    public void testIsValidWithFourBytesBuffer() {
-        ByteBuffer buffer = ByteBuffer.allocate(4);
-        Record record = new Record(buffer);
-        // it is a bit weird that we return `true` in this case, we could extend the definition of `isValid` to
-        // something like the following to detect a clearly corrupt record:
-        // return size() >= recordSize(0, 0) && checksum() == computeChecksum();
-        assertTrue(record.isValid());
-        // no exception should be thrown
-        record.ensureValid();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void cannotUpconvertWithNoTimestampType() {
-        Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "foo".getBytes(), "bar".getBytes());
-        record.convert(Record.MAGIC_VALUE_V1, TimestampType.NO_TIMESTAMP_TYPE);
-    }
-
-    @Test
-    public void testConvertFromV0ToV1() {
-        byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
-        byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
-
-        for (int i = 0; i < keys.length; i++) {
-            Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, keys[i], values[i]);
-            Record converted = record.convert(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
-
-            assertEquals(Record.MAGIC_VALUE_V1, converted.magic());
-            assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
-            assertEquals(TimestampType.CREATE_TIME, converted.timestampType());
-            assertEquals(record.key(), converted.key());
-            assertEquals(record.value(), converted.value());
-            assertTrue(record.isValid());
-            assertEquals(record.convertedSize(Record.MAGIC_VALUE_V1), converted.sizeInBytes());
-        }
-    }
-
-    @Test
-    public void testConvertFromV1ToV0() {
-        byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
-        byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
-
-        for (int i = 0; i < keys.length; i++) {
-            Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(), keys[i], values[i]);
-            Record converted = record.convert(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
-
-            assertEquals(Record.MAGIC_VALUE_V0, converted.magic());
-            assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
-            assertEquals(TimestampType.NO_TIMESTAMP_TYPE, converted.timestampType());
-            assertEquals(record.key(), converted.key());
-            assertEquals(record.value(), converted.value());
-            assertTrue(record.isValid());
-            assertEquals(record.convertedSize(Record.MAGIC_VALUE_V0), converted.sizeInBytes());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
deleted file mode 100644
index c262758..0000000
--- a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TimestampTypeTest {
-
-    @Test
-    public void toAndFromAttributesCreateTime() {
-        byte attributes = TimestampType.CREATE_TIME.updateAttributes((byte) 0);
-        assertEquals(TimestampType.CREATE_TIME, TimestampType.forAttributes(attributes));
-    }
-
-    @Test
-    public void toAndFromAttributesLogAppendTime() {
-        byte attributes = TimestampType.LOG_APPEND_TIME.updateAttributes((byte) 0);
-        assertEquals(TimestampType.LOG_APPEND_TIME, TimestampType.forAttributes(attributes));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void updateAttributesNotAllowedForNoTimestampType() {
-        TimestampType.NO_TIMESTAMP_TYPE.updateAttributes((byte) 0);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 64bfdf5..b9fbf06 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -28,8 +28,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.InvalidRecordException;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -63,9 +68,9 @@ public class RequestResponseTest {
         checkRequest(createControlledShutdownRequest());
         checkResponse(createControlledShutdownResponse(), 1);
         checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException());
-        checkRequest(createFetchRequest(3));
-        checkErrorResponse(createFetchRequest(3), new UnknownServerException());
-        checkResponse(createFetchResponse(), 0);
+        checkRequest(createFetchRequest(4));
+        checkResponse(createFetchResponse(), 4);
+        checkErrorResponse(createFetchRequest(4), new UnknownServerException());
         checkRequest(createHeartBeatRequest());
         checkErrorResponse(createHeartBeatRequest(), new UnknownServerException());
         checkResponse(createHeartBeatResponse(), 0);
@@ -103,8 +108,10 @@ public class RequestResponseTest {
         checkErrorResponse(createOffsetFetchRequest(1), new UnknownServerException());
         checkErrorResponse(createOffsetFetchRequest(2), new UnknownServerException());
         checkResponse(createOffsetFetchResponse(), 0);
-        checkRequest(createProduceRequest());
-        checkErrorResponse(createProduceRequest(), new UnknownServerException());
+        checkRequest(createProduceRequest(2));
+        checkErrorResponse(createProduceRequest(2), new UnknownServerException());
+        checkRequest(createProduceRequest(3));
+        checkErrorResponse(createProduceRequest(3), new UnknownServerException());
         checkResponse(createProduceResponse(), 2);
         checkRequest(createStopReplicaRequest(true));
         checkRequest(createStopReplicaRequest(false));
@@ -178,6 +185,7 @@ public class RequestResponseTest {
         for (int i = 0; i < latestVersion; ++i) {
             checkErrorResponse(createFetchRequest(i), new UnknownServerException());
             checkRequest(createFetchRequest(i));
+            checkResponse(createFetchResponse(), i);
         }
     }
 
@@ -216,7 +224,7 @@ public class RequestResponseTest {
 
     @Test
     public void produceRequestToStringTest() {
-        ProduceRequest request = createProduceRequest();
+        ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
         assertEquals(1, request.partitionRecordsOrFail().size());
         assertFalse(request.toString(false).contains("partitionSizes"));
         assertTrue(request.toString(false).contains("numPartitions=1"));
@@ -240,7 +248,7 @@ public class RequestResponseTest {
 
     @Test
     public void produceRequestGetErrorResponseTest() {
-        ProduceRequest request = createProduceRequest();
+        ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
         Set<TopicPartition> partitions = new HashSet<>(request.partitionRecordsOrFail().keySet());
 
         ProduceResponse errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
@@ -248,7 +256,7 @@ public class RequestResponseTest {
         ProduceResponse.PartitionResponse partitionResponse = errorResponse.responses().values().iterator().next();
         assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
         assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
-        assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+        assertEquals(RecordBatch.NO_TIMESTAMP, partitionResponse.logAppendTime);
 
         request.clearPartitionRecords();
 
@@ -258,14 +266,14 @@ public class RequestResponseTest {
         partitionResponse = errorResponse.responses().values().iterator().next();
         assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
         assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
-        assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+        assertEquals(RecordBatch.NO_TIMESTAMP, partitionResponse.logAppendTime);
     }
 
     @Test
     public void produceResponseVersionTest() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, Record.NO_TIMESTAMP));
+                10000, RecordBatch.NO_TIMESTAMP));
         ProduceResponse v0Response = new ProduceResponse(responseData);
         ProduceResponse v1Response = new ProduceResponse(responseData, 10);
         ProduceResponse v2Response = new ProduceResponse(responseData, 10);
@@ -283,12 +291,63 @@ public class RequestResponseTest {
         assertEquals("Response data does not match", responseData, v2Response.responses());
     }
 
+    @Test(expected = InvalidRecordException.class)
+    public void produceRequestV3ShouldContainOnlyOneRecordBatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "1".getBytes(), "b".getBytes());
+        builder.append(12L, null, "c".getBytes());
+        builder.close();
+
+        buffer.flip();
+
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
+        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void produceRequestV3CannotHaveNoRecordBatches() {
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY);
+        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void produceRequestV3CannotUseMagicV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE,
+                TimestampType.NO_TIMESTAMP_TYPE, 0L);
+        builder.append(10L, null, "a".getBytes());
+
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), builder.build());
+        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void produceRequestV3CannotUseMagicV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), builder.build());
+        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+    }
+
     @Test
     public void fetchResponseVersionTest() {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
-        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records));
+        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000,
+                FetchResponse.INVALID_LSO, null, records));
 
         FetchResponse v0Response = new FetchResponse(responseData, 0);
         FetchResponse v1Response = new FetchResponse(responseData, 10);
@@ -383,8 +442,15 @@ public class RequestResponseTest {
 
     private FetchResponse createFetchResponse() {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
-        MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
-        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records));
+        MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
+        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE,
+                1000000, FetchResponse.INVALID_LSO, null, records));
+
+        List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
+                new FetchResponse.AbortedTransaction(234L, 999L));
+        responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData(Errors.NONE,
+                1000000, FetchResponse.INVALID_LSO, abortedTransactions, MemoryRecords.EMPTY));
+
         return new FetchResponse(responseData, 25);
     }
 
@@ -529,16 +595,20 @@ public class RequestResponseTest {
         return new OffsetFetchResponse(Errors.NONE, responseData);
     }
 
-    private ProduceRequest createProduceRequest() {
-        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
-        produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(ByteBuffer.allocate(10)));
-        return new ProduceRequest.Builder((short) 1, 5000, produceData).build();
+    private ProduceRequest createProduceRequest(int version) {
+        if (version < 2)
+            throw new IllegalArgumentException("Produce request version 2 is not supported");
+
+        byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : RecordBatch.MAGIC_VALUE_V2;
+        MemoryRecords records = MemoryRecords.withRecords(magic, CompressionType.NONE, new SimpleRecord("woot".getBytes()));
+        Map<TopicPartition, MemoryRecords> produceData = Collections.singletonMap(new TopicPartition("test", 0), records);
+        return new ProduceRequest.Builder(magic, (short) 1, 5000, produceData).build((short) version);
     }
 
     private ProduceResponse createProduceResponse() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, Record.NO_TIMESTAMP));
+                10000, RecordBatch.NO_TIMESTAMP));
         return new ProduceResponse(responseData, 0);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java b/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java
new file mode 100644
index 0000000..adb5da7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class Crc32Test {
+
+    @Test
+    public void testUpdateByteBuffer() {
+        byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5};
+        doTestUpdateByteBuffer(bytes, ByteBuffer.allocate(bytes.length));
+        doTestUpdateByteBuffer(bytes, ByteBuffer.allocateDirect(bytes.length));
+    }
+
+    private void doTestUpdateByteBuffer(byte[] bytes, ByteBuffer buffer) {
+        buffer.put(bytes);
+        buffer.flip();
+        Crc32 bufferCrc = new Crc32();
+        bufferCrc.update(buffer, buffer.remaining());
+        assertEquals(Crc32.crc32(bytes), bufferCrc.getValue());
+        assertEquals(0, buffer.position());
+    }
+
+    @Test
+    public void testUpdateByteBufferWithOffsetPosition() {
+        byte[] bytes = new byte[]{-2, -1, 0, 1, 2, 3, 4, 5};
+        doTestUpdateByteBufferWithOffsetPosition(bytes, ByteBuffer.allocate(bytes.length), 2);
+        doTestUpdateByteBufferWithOffsetPosition(bytes, ByteBuffer.allocateDirect(bytes.length), 2);
+    }
+
+    private void doTestUpdateByteBufferWithOffsetPosition(byte[] bytes, ByteBuffer buffer, int offset) {
+        buffer.put(bytes);
+        buffer.flip();
+        buffer.position(offset);
+
+        Crc32 bufferCrc = new Crc32();
+        bufferCrc.update(buffer, buffer.remaining());
+        assertEquals(Crc32.crc32(bytes, offset, buffer.remaining()), bufferCrc.getValue());
+        assertEquals(offset, buffer.position());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 7672335..16742d5 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -22,10 +22,12 @@ import org.easymock.IAnswer;
 import org.junit.Test;
 
 import java.io.Closeable;
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
@@ -34,6 +36,7 @@ import java.util.Random;
 import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -88,6 +91,95 @@ public class UtilsTest {
         assertEquals(1, Utils.abs(-1));
     }
 
+    @Test
+    public void writeToBuffer() throws IOException {
+        byte[] input = {0, 1, 2, 3, 4, 5};
+        ByteBuffer source = ByteBuffer.wrap(input);
+
+        doTestWriteToByteBuffer(source, ByteBuffer.allocate(input.length));
+        doTestWriteToByteBuffer(source, ByteBuffer.allocateDirect(input.length));
+        assertEquals(0, source.position());
+
+        source.position(2);
+        doTestWriteToByteBuffer(source, ByteBuffer.allocate(input.length));
+        doTestWriteToByteBuffer(source, ByteBuffer.allocateDirect(input.length));
+    }
+
+    private void doTestWriteToByteBuffer(ByteBuffer source, ByteBuffer dest) throws IOException {
+        int numBytes = source.remaining();
+        int position = source.position();
+        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(dest));
+        Utils.writeTo(out, source, source.remaining());
+        dest.flip();
+        assertEquals(numBytes, dest.remaining());
+        assertEquals(position, source.position());
+        assertEquals(source, dest);
+    }
+
+    @Test
+    public void toArray() {
+        byte[] input = {0, 1, 2, 3, 4};
+        ByteBuffer buffer = ByteBuffer.wrap(input);
+        assertArrayEquals(input, Utils.toArray(buffer));
+        assertEquals(0, buffer.position());
+
+        assertArrayEquals(new byte[] {1, 2}, Utils.toArray(buffer, 1, 2));
+        assertEquals(0, buffer.position());
+
+        buffer.position(2);
+        assertArrayEquals(new byte[] {2, 3, 4}, Utils.toArray(buffer));
+        assertEquals(2, buffer.position());
+    }
+
+    @Test
+    public void toArrayDirectByteBuffer() {
+        byte[] input = {0, 1, 2, 3, 4};
+        ByteBuffer buffer = ByteBuffer.allocateDirect(5);
+        buffer.put(input);
+        buffer.rewind();
+
+        assertArrayEquals(input, Utils.toArray(buffer));
+        assertEquals(0, buffer.position());
+
+        assertArrayEquals(new byte[] {1, 2}, Utils.toArray(buffer, 1, 2));
+        assertEquals(0, buffer.position());
+
+        buffer.position(2);
+        assertArrayEquals(new byte[] {2, 3, 4}, Utils.toArray(buffer));
+        assertEquals(2, buffer.position());
+    }
+
+    @Test
+    public void utf8ByteArraySerde() {
+        String utf8String = "A\u00ea\u00f1\u00fcC";
+        byte[] utf8Bytes = utf8String.getBytes(StandardCharsets.UTF_8);
+        assertArrayEquals(utf8Bytes, Utils.utf8(utf8String));
+        assertEquals(utf8Bytes.length, Utils.utf8Length(utf8String));
+        assertEquals(utf8String, Utils.utf8(utf8Bytes));
+    }
+
+    @Test
+    public void utf8ByteBufferSerde() {
+        doTestUtf8ByteBuffer(ByteBuffer.allocate(20));
+        doTestUtf8ByteBuffer(ByteBuffer.allocateDirect(20));
+    }
+
+    private void doTestUtf8ByteBuffer(ByteBuffer utf8Buffer) {
+        String utf8String = "A\u00ea\u00f1\u00fcC";
+        byte[] utf8Bytes = utf8String.getBytes(StandardCharsets.UTF_8);
+
+        utf8Buffer.position(4);
+        utf8Buffer.put(utf8Bytes);
+
+        utf8Buffer.position(4);
+        assertEquals(utf8String, Utils.utf8(utf8Buffer, utf8Bytes.length));
+        assertEquals(4, utf8Buffer.position());
+
+        utf8Buffer.position(0);
+        assertEquals(utf8String, Utils.utf8(utf8Buffer, 4, utf8Bytes.length));
+        assertEquals(0, utf8Buffer.position());
+    }
+
     private void subTest(ByteBuffer buffer) {
         // The first byte should be 'A'
         assertEquals('A', (Utils.readBytes(buffer, 0, 1))[0]);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 975d423..5ab8c9c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
@@ -101,7 +102,8 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
                     time,
-                    true);
+                    true,
+                    new ApiVersions());
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
             this.coordinator = new WorkerCoordinator(this.client,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index dabb347..913ae1f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -17,13 +17,13 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 
 public final class ConnectUtils {
     public static Long checkAndConvertTimestamp(Long timestamp) {
         if (timestamp == null || timestamp >= 0)
             return timestamp;
-        else if (timestamp == Record.NO_TIMESTAMP)
+        else if (timestamp == RecordBatch.NO_TIMESTAMP)
             return null;
         else
             throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 47ce1fc..26ac486 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
@@ -533,7 +533,7 @@ public class WorkerSinkTaskTest {
     @Test
     public void testMissingTimestampPropagation() throws Exception {
         expectInitializeTask();
-        expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME);
+        expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
         expectConversionAndTransformation(1);
 
         Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
@@ -662,7 +662,7 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectConsumerPoll(final int numMessages) {
-        expectConsumerPoll(numMessages, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
+        expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
     }
 
     private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 4b28460..45ba58b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -293,7 +293,8 @@ object AdminClient {
       DefaultReceiveBufferBytes,
       DefaultRequestTimeoutMs,
       time,
-      true)
+      true,
+      new ApiVersions)
 
     val highLevelClient = new ConsumerNetworkClient(
       networkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 730f313..2ed6452 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.RecordBatch
 
 /**
  * This class contains the different Kafka versions.
@@ -62,7 +62,10 @@ object ApiVersion {
     "0.10.1" -> KAFKA_0_10_1_IV2,
     // introduced UpdateMetadataRequest v3 in KIP-103
     "0.10.2-IV0" -> KAFKA_0_10_2_IV0,
-    "0.10.2" -> KAFKA_0_10_2_IV0
+    "0.10.2" -> KAFKA_0_10_2_IV0,
+    // KIP-98 (idempotent and transactional producer support)
+    "0.11.0-IV0" -> KAFKA_0_11_0_IV0,
+    "0.11.0" -> KAFKA_0_11_0_IV0
   )
 
   private val versionPattern = "\\.".r
@@ -89,60 +92,66 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
 // Keep the IDs in order of versions
 case object KAFKA_0_8_0 extends ApiVersion {
   val version: String = "0.8.0.X"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
   val id: Int = 0
 }
 
 case object KAFKA_0_8_1 extends ApiVersion {
   val version: String = "0.8.1.X"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
   val id: Int = 1
 }
 
 case object KAFKA_0_8_2 extends ApiVersion {
   val version: String = "0.8.2.X"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
   val id: Int = 2
 }
 
 case object KAFKA_0_9_0 extends ApiVersion {
   val version: String = "0.9.0.X"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
   val id: Int = 3
 }
 
 case object KAFKA_0_10_0_IV0 extends ApiVersion {
   val version: String = "0.10.0-IV0"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
   val id: Int = 4
 }
 
 case object KAFKA_0_10_0_IV1 extends ApiVersion {
   val version: String = "0.10.0-IV1"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
   val id: Int = 5
 }
 
 case object KAFKA_0_10_1_IV0 extends ApiVersion {
   val version: String = "0.10.1-IV0"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
   val id: Int = 6
 }
 
 case object KAFKA_0_10_1_IV1 extends ApiVersion {
   val version: String = "0.10.1-IV1"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
   val id: Int = 7
 }
 
 case object KAFKA_0_10_1_IV2 extends ApiVersion {
   val version: String = "0.10.1-IV2"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
   val id: Int = 8
 }
 
 case object KAFKA_0_10_2_IV0 extends ApiVersion {
   val version: String = "0.10.2-IV0"
-  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
   val id: Int = 9
 }
+
+case object KAFKA_0_11_0_IV0 extends ApiVersion {
+  val version: String = "0.11.0-IV0"
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val id: Int = 10
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index f049821..f91a3c3 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -203,7 +203,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
     val responseData = new util.LinkedHashMap[TopicPartition, JFetchResponse.PartitionData]
     requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
       responseData.put(new TopicPartition(topic, partition),
-        new JFetchResponse.PartitionData(Errors.forException(e), -1, MemoryRecords.EMPTY))
+        new JFetchResponse.PartitionData(Errors.forException(e), JFetchResponse.INVALID_HIGHWATERMARK,
+          JFetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
     }
     val errorResponse = new JFetchResponse(responseData, 0)
     // Magic value does not matter here because the message set is empty