You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 21:14:19 UTC

kafka git commit: KAFKA-5093; Avoid loading full batch data when possible when iterating FileRecords

Repository: kafka
Updated Branches:
  refs/heads/trunk da9a171c9 -> 81f0c1e8f


KAFKA-5093; Avoid loading full batch data when possible when iterating FileRecords

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3160 from hachikuji/KAFKA-5093


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81f0c1e8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81f0c1e8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81f0c1e8

Branch: refs/heads/trunk
Commit: 81f0c1e8f2ba2d86f061361b5ee33bb8e6f640c5
Parents: da9a171
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 31 14:11:47 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 31 14:11:47 2017 -0700

----------------------------------------------------------------------
 .../record/AbstractLegacyRecordBatch.java       |  73 ++++++
 .../kafka/common/record/DefaultRecordBatch.java |  77 +++++++
 .../kafka/common/record/FileLogInputStream.java | 216 +++++++-----------
 .../apache/kafka/common/record/FileRecords.java |   4 +-
 .../kafka/common/record/LegacyRecord.java       |  43 ++--
 .../kafka/common/record/MemoryRecords.java      |  11 +-
 .../org/apache/kafka/common/record/Records.java |   2 +
 .../clients/consumer/internals/FetcherTest.java |   2 +-
 .../common/record/FileLogInputStreamTest.java   | 223 ++++++++++++++++++-
 .../kafka/common/record/FileRecordsTest.java    |   6 +-
 .../common/record/MemoryRecordsBuilderTest.java |   2 +-
 .../kafka/message/ByteBufferMessageSet.scala    |   4 +-
 .../scala/kafka/message/MessageAndOffset.scala  |  24 +-
 13 files changed, 504 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index e4938be..9b74d06 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -29,6 +29,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayDeque;
 import java.util.NoSuchElementException;
 
@@ -510,4 +511,76 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
         }
     }
 
+    static class LegacyFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {
+
+        LegacyFileChannelRecordBatch(long offset,
+                                     byte magic,
+                                     FileChannel channel,
+                                     int position,
+                                     int batchSize) {
+            super(offset, magic, channel, position, batchSize);
+        }
+
+        @Override
+        protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
+            return new ByteBufferLegacyRecordBatch(buffer);
+        }
+
+        @Override
+        public long baseOffset() {
+            return loadFullBatch().baseOffset();
+        }
+
+        @Override
+        public long lastOffset() {
+            return offset;
+        }
+
+        @Override
+        public long producerId() {
+            return RecordBatch.NO_PRODUCER_ID;
+        }
+
+        @Override
+        public short producerEpoch() {
+            return RecordBatch.NO_PRODUCER_EPOCH;
+        }
+
+        @Override
+        public int baseSequence() {
+            return RecordBatch.NO_SEQUENCE;
+        }
+
+        @Override
+        public int lastSequence() {
+            return RecordBatch.NO_SEQUENCE;
+        }
+
+        @Override
+        public Integer countOrNull() {
+            return null;
+        }
+
+        @Override
+        public boolean isTransactional() {
+            return false;
+        }
+
+        @Override
+        public boolean isControlBatch() {
+            return false;
+        }
+
+        @Override
+        public int partitionLeaderEpoch() {
+            return RecordBatch.NO_PARTITION_LEADER_EPOCH;
+        }
+
+        @Override
+        protected int headerSize() {
+            return LOG_OVERHEAD + LegacyRecord.headerSize(magic);
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 2bf889f..bdba860 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Crc32C;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -498,4 +499,80 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
 
     }
 
+    static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {
+
+        DefaultFileChannelRecordBatch(long offset,
+                                      byte magic,
+                                      FileChannel channel,
+                                      int position,
+                                      int batchSize) {
+            super(offset, magic, channel, position, batchSize);
+        }
+
+        @Override
+        protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
+            return new DefaultRecordBatch(buffer);
+        }
+
+        @Override
+        public long baseOffset() {
+            return offset;
+        }
+
+        @Override
+        public long lastOffset() {
+            return loadBatchHeader().lastOffset();
+        }
+
+        @Override
+        public long producerId() {
+            return loadBatchHeader().producerId();
+        }
+
+        @Override
+        public short producerEpoch() {
+            return loadBatchHeader().producerEpoch();
+        }
+
+        @Override
+        public int baseSequence() {
+            return loadBatchHeader().baseSequence();
+        }
+
+        @Override
+        public int lastSequence() {
+            return loadBatchHeader().lastSequence();
+        }
+
+        @Override
+        public long checksum() {
+            return loadBatchHeader().checksum();
+        }
+
+        @Override
+        public Integer countOrNull() {
+            return loadBatchHeader().countOrNull();
+        }
+
+        @Override
+        public boolean isTransactional() {
+            return loadBatchHeader().isTransactional();
+        }
+
+        @Override
+        public boolean isControlBatch() {
+            return loadBatchHeader().isControlBatch();
+        }
+
+        @Override
+        public int partitionLeaderEpoch() {
+            return loadBatchHeader().partitionLeaderEpoch();
+        }
+
+        @Override
+        protected int headerSize() {
+            return RECORD_BATCH_OVERHEAD;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 57fec4f..75eb1b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch;
+import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 
@@ -27,6 +29,10 @@ import java.nio.channels.FileChannel;
 import java.util.Iterator;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
 
 /**
  * A log input stream which is backed by a {@link FileChannel}.
@@ -35,7 +41,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
     private int position;
     private final int end;
     private final FileChannel channel;
-    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD);
+    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
 
     /**
      * Create a new log input stream over the FileChannel
@@ -53,15 +59,15 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
     @Override
     public FileChannelRecordBatch nextBatch() throws IOException {
-        if (position + LOG_OVERHEAD >= end)
+        if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
             return null;
 
         logHeaderBuffer.rewind();
         Utils.readFullyOrFail(channel, logHeaderBuffer, position, "log header");
 
         logHeaderBuffer.rewind();
-        long offset = logHeaderBuffer.getLong();
-        int size = logHeaderBuffer.getInt();
+        long offset = logHeaderBuffer.getLong(OFFSET_OFFSET);
+        int size = logHeaderBuffer.getInt(SIZE_OFFSET);
 
         // V0 has the smallest overhead, stricter checking is done later
         if (size < LegacyRecord.RECORD_OVERHEAD_V0)
@@ -70,7 +76,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         if (position + LOG_OVERHEAD + size > end)
             return null;
 
-        FileChannelRecordBatch batch = new FileChannelRecordBatch(offset, channel, position, size);
+        byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
+        final FileChannelRecordBatch batch;
+
+        if (magic < RecordBatch.MAGIC_VALUE_V2)
+            batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size);
+        else
+            batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size);
+
         position += batch.sizeInBytes();
         return batch;
     }
@@ -80,71 +93,46 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
      * without needing to read the record data into memory until it is needed. The downside
      * is that entries will generally no longer be readable when the underlying channel is closed.
      */
-    public static class FileChannelRecordBatch extends AbstractRecordBatch {
-        private final long offset;
-        private final FileChannel channel;
-        private final int position;
-        private final int batchSize;
-        private RecordBatch underlying;
-        private Byte magic;
-
-        private FileChannelRecordBatch(long offset,
-                                       FileChannel channel,
-                                       int position,
-                                       int batchSize) {
+    public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {
+        protected final long offset;
+        protected final byte magic;
+        protected final FileChannel channel;
+        protected final int position;
+        protected final int batchSize;
+
+        private RecordBatch fullBatch;
+        private RecordBatch batchHeader;
+
+        FileChannelRecordBatch(long offset,
+                               byte magic,
+                               FileChannel channel,
+                               int position,
+                               int batchSize) {
             this.offset = offset;
+            this.magic = magic;
             this.channel = channel;
             this.position = position;
             this.batchSize = batchSize;
         }
 
         @Override
-        public long baseOffset() {
-            if (magic() >= RecordBatch.MAGIC_VALUE_V2)
-                return offset;
-
-            loadUnderlyingRecordBatch();
-            return underlying.baseOffset();
-        }
-
-        @Override
         public CompressionType compressionType() {
-            loadUnderlyingRecordBatch();
-            return underlying.compressionType();
+            return loadBatchHeader().compressionType();
         }
 
         @Override
         public TimestampType timestampType() {
-            loadUnderlyingRecordBatch();
-            return underlying.timestampType();
+            return loadBatchHeader().timestampType();
         }
 
         @Override
-        public long maxTimestamp() {
-            loadUnderlyingRecordBatch();
-            return underlying.maxTimestamp();
+        public long checksum() {
+            return loadBatchHeader().checksum();
         }
 
         @Override
-        public long lastOffset() {
-            if (magic() < RecordBatch.MAGIC_VALUE_V2)
-                return offset;
-            else if (underlying != null)
-                return underlying.lastOffset();
-
-            try {
-                // TODO: this logic probably should be moved into DefaultRecordBatch somehow
-                // maybe we just need two separate implementations
-
-                byte[] offsetDelta = new byte[4];
-                ByteBuffer buf = ByteBuffer.wrap(offsetDelta);
-                channel.read(buf, position + DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET);
-                if (buf.hasRemaining())
-                    throw new KafkaException("Failed to read magic byte from FileChannel " + channel);
-                return offset + buf.getInt(0);
-            } catch (IOException e) {
-                throw new KafkaException(e);
-            }
+        public long maxTimestamp() {
+            return loadBatchHeader().maxTimestamp();
         }
 
         public int position() {
@@ -153,92 +141,27 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
         @Override
         public byte magic() {
-            if (magic != null)
-                return magic;
-            if (underlying != null)
-                return underlying.magic();
-
-            try {
-                ByteBuffer buf = ByteBuffer.wrap(new byte[1]);
-                Utils.readFullyOrFail(channel, buf, position + Records.MAGIC_OFFSET, "magic byte");
-                magic = buf.get(0);
-                return magic;
-            } catch (IOException e) {
-                throw new KafkaException(e);
-            }
-        }
-
-        @Override
-        public long producerId() {
-            loadUnderlyingRecordBatch();
-            return underlying.producerId();
-        }
-
-        @Override
-        public short producerEpoch() {
-            loadUnderlyingRecordBatch();
-            return underlying.producerEpoch();
-        }
-
-        @Override
-        public int baseSequence() {
-            loadUnderlyingRecordBatch();
-            return underlying.baseSequence();
-        }
-
-        @Override
-        public int lastSequence() {
-            loadUnderlyingRecordBatch();
-            return underlying.lastSequence();
-        }
-
-        private void loadUnderlyingRecordBatch() {
-            try {
-                if (underlying != null)
-                    return;
-
-                ByteBuffer batchBuffer = ByteBuffer.allocate(sizeInBytes());
-                Utils.readFullyOrFail(channel, batchBuffer, position, "full record batch");
-                batchBuffer.rewind();
-
-                byte magic = batchBuffer.get(Records.MAGIC_OFFSET);
-                if (magic > RecordBatch.MAGIC_VALUE_V1)
-                    underlying = new DefaultRecordBatch(batchBuffer);
-                else
-                    underlying = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchBuffer);
-            } catch (IOException e) {
-                throw new KafkaException("Failed to load record batch at position " + position + " from file channel " + channel);
-            }
+            return magic;
         }
 
         @Override
         public Iterator<Record> iterator() {
-            loadUnderlyingRecordBatch();
-            return underlying.iterator();
+            return loadFullBatch().iterator();
         }
 
         @Override
         public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
-            loadUnderlyingRecordBatch();
-            return underlying.streamingIterator(bufferSupplier);
+            return loadFullBatch().streamingIterator(bufferSupplier);
         }
 
         @Override
         public boolean isValid() {
-            loadUnderlyingRecordBatch();
-            return underlying.isValid();
+            return loadFullBatch().isValid();
         }
 
         @Override
         public void ensureValid() {
-            loadUnderlyingRecordBatch();
-            underlying.ensureValid();
-        }
-
-        @Override
-        public long checksum() {
-            loadUnderlyingRecordBatch();
-            return underlying.checksum();
+            loadFullBatch().ensureValid();
         }
 
         @Override
@@ -247,12 +170,6 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         }
 
         @Override
-        public Integer countOrNull() {
-            loadUnderlyingRecordBatch();
-            return underlying.countOrNull();
-        }
-
-        @Override
         public void writeTo(ByteBuffer buffer) {
             try {
                 int limit = buffer.limit();
@@ -265,22 +182,37 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
             }
         }
 
-        @Override
-        public boolean isTransactional() {
-            loadUnderlyingRecordBatch();
-            return underlying.isTransactional();
+        protected abstract RecordBatch toMemoryRecordBatch(ByteBuffer buffer);
+
+        protected abstract int headerSize();
+
+        protected RecordBatch loadFullBatch() {
+            if (fullBatch == null) {
+                batchHeader = null;
+                fullBatch = loadBatchWithSize(sizeInBytes(), "full record batch");
+            }
+            return fullBatch;
         }
 
-        @Override
-        public boolean isControlBatch() {
-            loadUnderlyingRecordBatch();
-            return underlying.isControlBatch();
+        protected RecordBatch loadBatchHeader() {
+            if (fullBatch != null)
+                return fullBatch;
+
+            if (batchHeader == null)
+                batchHeader = loadBatchWithSize(headerSize(), "record batch header");
+
+            return batchHeader;
         }
 
-        @Override
-        public int partitionLeaderEpoch() {
-            loadUnderlyingRecordBatch();
-            return underlying.partitionLeaderEpoch();
+        private RecordBatch loadBatchWithSize(int size, String description) {
+            try {
+                ByteBuffer buffer = ByteBuffer.allocate(size);
+                Utils.readFullyOrFail(channel, buffer, position, description);
+                buffer.rewind();
+                return toMemoryRecordBatch(buffer);
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
         }
 
         @Override
@@ -309,7 +241,9 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
         @Override
         public String toString() {
-            return "FileChannelRecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+            return "FileChannelRecordBatch(magic: " + magic +
+                    ", offset: " + offset +
+                    ", size: " + batchSize + ")";
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index a72ba8b..32ca1a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -202,7 +202,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
     public void renameTo(File f) throws IOException {
         try {
             Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
-        }  finally {
+        } finally {
             this.file = f;
         }
     }
@@ -391,7 +391,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @param mutable mutable
      * @param fileAlreadyExists File already exists or not
      * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
-     * @param preallocate Pre allocate file or not, gotten from configuration.
+     * @param preallocate Pre-allocate file or not, gotten from configuration.
      */
     private static FileChannel openChannel(File file,
                                            boolean mutable,

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
index 25185b0..482c4a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
@@ -45,10 +45,10 @@ public final class LegacyRecord {
     public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
     public static final int MAGIC_LENGTH = 1;
     public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
-    public static final int ATTRIBUTE_LENGTH = 1;
-    public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int ATTRIBUTES_LENGTH = 1;
+    public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH;
     public static final int TIMESTAMP_LENGTH = 8;
-    public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH;
     public static final int KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH;
     public static final int KEY_SIZE_LENGTH = 4;
     public static final int KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH;
@@ -58,17 +58,18 @@ public final class LegacyRecord {
     /**
      * The size for the record header
      */
-    public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
+    public static final int HEADER_SIZE_V0 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH;
+    public static final int HEADER_SIZE_V1 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH + TIMESTAMP_LENGTH;
 
     /**
      * The amount of overhead bytes in a record
      */
-    public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+    public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE_V0 + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
 
     /**
      * The amount of overhead bytes in a record
      */
-    public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+    public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE_V1 + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
 
     /**
      * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
@@ -483,19 +484,11 @@ public final class LegacyRecord {
         }
     }
 
-    public static int recordSize(byte[] key, byte[] value) {
-        return recordSize(RecordBatch.CURRENT_MAGIC_VALUE, key, value);
-    }
-
-    public static int recordSize(byte magic, byte[] key, byte[] value) {
-        return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length);
-    }
-
-    public static int recordSize(byte magic, ByteBuffer key, ByteBuffer value) {
+    static int recordSize(byte magic, ByteBuffer key, ByteBuffer value) {
         return recordSize(magic, key == null ? 0 : key.limit(), value == null ? 0 : value.limit());
     }
 
-    private static int recordSize(byte magic, int keySize, int valueSize) {
+    public static int recordSize(byte magic, int keySize, int valueSize) {
         return recordOverhead(magic) + keySize + valueSize;
     }
 
@@ -547,16 +540,28 @@ public final class LegacyRecord {
         return crc.getValue();
     }
 
-    public static int recordOverhead(byte magic) {
+    static int recordOverhead(byte magic) {
         if (magic == 0)
             return RECORD_OVERHEAD_V0;
-        return RECORD_OVERHEAD_V1;
+        else if (magic == 1)
+            return RECORD_OVERHEAD_V1;
+        throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic);
+    }
+
+    static int headerSize(byte magic) {
+        if (magic == 0)
+            return HEADER_SIZE_V0;
+        else if (magic == 1)
+            return HEADER_SIZE_V1;
+        throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic);
     }
 
     private static int keyOffset(byte magic) {
         if (magic == 0)
             return KEY_OFFSET_V0;
-        return KEY_OFFSET_V1;
+        else if (magic == 1)
+            return KEY_OFFSET_V1;
+        throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic);
     }
 
     public static TimestampType timestampType(byte magic, TimestampType wrapperRecordTimestampType, byte attributes) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 528095e..1d45635 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -506,11 +506,18 @@ public class MemoryRecords extends AbstractRecords {
                 producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records);
     }
 
+    public static MemoryRecords withTransactionalRecords(byte magic, long initialOffset, CompressionType compressionType,
+                                                         long producerId, short producerEpoch, int baseSequence,
+                                                         int partitionLeaderEpoch, SimpleRecord... records) {
+        return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch,
+                baseSequence, partitionLeaderEpoch, true, records);
+    }
+
     public static MemoryRecords withTransactionalRecords(long initialOffset, CompressionType compressionType, long producerId,
                                                          short producerEpoch, int baseSequence, int partitionLeaderEpoch,
                                                          SimpleRecord... records) {
-        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
-                producerId, producerEpoch, baseSequence, partitionLeaderEpoch, true, records);
+        return withTransactionalRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType,
+                producerId, producerEpoch, baseSequence, partitionLeaderEpoch, records);
     }
 
     public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 6a4d1a1..a5a5036 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -49,6 +49,8 @@ public interface Records {
     // the magic offset is at the same offset for all current message formats, but the 4 bytes
     // between the size and the magic is dependent on the version.
     int MAGIC_OFFSET = 16;
+    int MAGIC_LENGTH = 1;
+    int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH;
 
     /**
      * The size of these records in bytes.

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7d48623..97a6259 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -303,7 +303,7 @@ public class FetcherTest {
         long offset = 0;
         long timestamp = 500L;
 
-        int size = LegacyRecord.recordSize(key, value);
+        int size = LegacyRecord.recordSize(magic, key.length, value.length);
         byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
         long crc = LegacyRecord.computeChecksum(magic, attributes, timestamp, key, value);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 7c37354..d5de4bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -16,33 +16,56 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
+import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
+import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
+import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;
 import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(value = Parameterized.class)
 public class FileLogInputStreamTest {
 
+    private final byte magic;
+    private final CompressionType compression;
+
+    public FileLogInputStreamTest(byte magic, CompressionType compression) {
+        this.magic = magic;
+        this.compression = compression;
+    }
+
     @Test
     public void testWriteTo() throws IOException {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
-            fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("foo".getBytes()),
-                    new SimpleRecord("bar".getBytes())));
+            fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes())));
             fileRecords.flush();
 
             FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
                     fileRecords.sizeInBytes());
 
-            FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch();
+            FileChannelRecordBatch batch = logInputStream.nextBatch();
             assertNotNull(batch);
-            assertEquals(RecordBatch.MAGIC_VALUE_V2, batch.magic());
+            assertEquals(magic, batch.magic());
 
             ByteBuffer buffer = ByteBuffer.allocate(128);
             batch.writeTo(buffer);
@@ -50,13 +73,195 @@ public class FileLogInputStreamTest {
 
             MemoryRecords memRecords = MemoryRecords.readableRecords(buffer);
             List<Record> records = Utils.toList(memRecords.records().iterator());
-            assertEquals(2, records.size());
+            assertEquals(1, records.size());
             Record record0 = records.get(0);
-            assertTrue(record0.hasMagic(RecordBatch.MAGIC_VALUE_V2));
+            assertTrue(record0.hasMagic(magic));
             assertEquals("foo", Utils.utf8(record0.value(), record0.valueSize()));
-            Record record1 = records.get(1);
-            assertTrue(record1.hasMagic(RecordBatch.MAGIC_VALUE_V2));
-            assertEquals("bar", Utils.utf8(record1.value(), record1.valueSize()));
         }
     }
+
+    @Test
+    public void testSimpleBatchIteration() throws IOException {
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            SimpleRecord firstBatchRecord = new SimpleRecord(3241324L, "a".getBytes(), "foo".getBytes());
+            SimpleRecord secondBatchRecord = new SimpleRecord(234280L, "b".getBytes(), "bar".getBytes());
+
+            fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord));
+            fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
+            fileRecords.flush();
+
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+                    fileRecords.sizeInBytes());
+
+            FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+            assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecord);
+            assertNoProducerData(firstBatch);
+
+            FileChannelRecordBatch secondBatch = logInputStream.nextBatch();
+            assertGenericRecordBatchData(secondBatch, 1L, 234280L, secondBatchRecord);
+            assertNoProducerData(secondBatch);
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    @Test
+    public void testBatchIterationWithMultipleRecordsPerBatch() throws IOException {
+        if (magic < MAGIC_VALUE_V2 && compression == CompressionType.NONE)
+            return;
+
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
+                new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
+
+            };
+            SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
+                new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
+                new SimpleRecord(897839L, null, "4".getBytes()),
+                new SimpleRecord(8234020L, "e".getBytes(), null)
+            };
+
+            fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecords));
+            fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecords));
+            fileRecords.flush();
+
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+                    fileRecords.sizeInBytes());
+
+            FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+            assertNoProducerData(firstBatch);
+            assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecords);
+
+            FileChannelRecordBatch secondBatch = logInputStream.nextBatch();
+            assertNoProducerData(secondBatch);
+            assertGenericRecordBatchData(secondBatch, 1L, 238423489L, secondBatchRecords);
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    @Test
+    public void testBatchIterationV2() throws IOException {
+        if (magic != MAGIC_VALUE_V2)
+            return;
+
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            long producerId = 83843L;
+            short producerEpoch = 15;
+            int baseSequence = 234;
+            int partitionLeaderEpoch = 9832;
+
+            SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
+                new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
+
+            };
+            SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
+                new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
+                new SimpleRecord(897839L, null, "4".getBytes()),
+                new SimpleRecord(8234020L, "e".getBytes(), null)
+            };
+
+            fileRecords.append(MemoryRecords.withIdempotentRecords(magic, 15L, compression, producerId,
+                    producerEpoch, baseSequence, partitionLeaderEpoch, firstBatchRecords));
+            fileRecords.append(MemoryRecords.withTransactionalRecords(magic, 27L, compression, producerId,
+                    producerEpoch, baseSequence + firstBatchRecords.length, partitionLeaderEpoch, secondBatchRecords));
+            fileRecords.flush();
+
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+                    fileRecords.sizeInBytes());
+
+            FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+            assertProducerData(firstBatch, producerId, producerEpoch, baseSequence, false, firstBatchRecords);
+            assertGenericRecordBatchData(firstBatch, 15L, 3241324L, firstBatchRecords);
+            assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch());
+
+            FileChannelRecordBatch secondBatch = logInputStream.nextBatch();
+            assertProducerData(secondBatch, producerId, producerEpoch, baseSequence + firstBatchRecords.length,
+                    true, secondBatchRecords);
+            assertGenericRecordBatchData(secondBatch, 27L, 238423489L, secondBatchRecords);
+            assertEquals(partitionLeaderEpoch, secondBatch.partitionLeaderEpoch());
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    @Test
+    public void testBatchIterationIncompleteBatch() throws IOException {
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            SimpleRecord firstBatchRecord = new SimpleRecord(100L, "foo".getBytes());
+            SimpleRecord secondBatchRecord = new SimpleRecord(200L, "bar".getBytes());
+
+            fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord));
+            fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
+            fileRecords.flush();
+            fileRecords.truncateTo(fileRecords.sizeInBytes() - 13);
+
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+                    fileRecords.sizeInBytes());
+
+            FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+            assertNoProducerData(firstBatch);
+            assertGenericRecordBatchData(firstBatch, 0L, 100L, firstBatchRecord);
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch, int baseSequence,
+                                    boolean isTransactional, SimpleRecord ... records) {
+        assertEquals(producerId, batch.producerId());
+        assertEquals(producerEpoch, batch.producerEpoch());
+        assertEquals(baseSequence, batch.baseSequence());
+        assertEquals(baseSequence + records.length - 1, batch.lastSequence());
+        assertEquals(isTransactional, batch.isTransactional());
+    }
+
+    private void assertNoProducerData(RecordBatch batch) {
+        assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+        assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+        assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        assertEquals(RecordBatch.NO_SEQUENCE, batch.lastSequence());
+        assertFalse(batch.isTransactional());
+    }
+
+    private void assertGenericRecordBatchData(RecordBatch batch, long baseOffset, long maxTimestamp, SimpleRecord ... records) {
+        assertEquals(magic, batch.magic());
+        assertEquals(compression, batch.compressionType());
+
+        if (magic == MAGIC_VALUE_V0) {
+            assertEquals(NO_TIMESTAMP_TYPE, batch.timestampType());
+        } else {
+            assertEquals(CREATE_TIME, batch.timestampType());
+            assertEquals(maxTimestamp, batch.maxTimestamp());
+        }
+
+        assertEquals(baseOffset + records.length - 1, batch.lastOffset());
+        if (magic >= MAGIC_VALUE_V2)
+            assertEquals(Integer.valueOf(records.length), batch.countOrNull());
+
+        assertEquals(baseOffset, batch.baseOffset());
+        assertTrue(batch.isValid());
+
+        List<Record> batchRecords = TestUtils.toList(batch);
+        for (int i = 0; i < records.length; i++) {
+            assertEquals(baseOffset + i, batchRecords.get(i).offset());
+            assertEquals(records[i].key(), batchRecords.get(i).key());
+            assertEquals(records[i].value(), batchRecords.get(i).value());
+            if (magic == MAGIC_VALUE_V0)
+                assertEquals(NO_TIMESTAMP, batchRecords.get(i).timestamp());
+            else
+                assertEquals(records[i].timestamp(), batchRecords.get(i).timestamp());
+        }
+    }
+
+    @Parameterized.Parameters(name = "magic={0}, compression={1}")
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+        for (byte magic : asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2))
+            for (CompressionType type: CompressionType.values())
+                values.add(new Object[] {magic, type});
+        return values;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 11ee419..8b9c900 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -26,11 +26,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import static java.util.Arrays.asList;
 import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -324,8 +324,8 @@ public class FileRecordsTest {
     }
 
     private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException {
-        List<Long> offsets = Arrays.asList(0L, 2L, 3L, 9L, 11L, 15L);
-        List<SimpleRecord> records = Arrays.asList(
+        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L);
+        List<SimpleRecord> records = asList(
                 new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
                 new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
                 new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 0922c48..c621d53 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -554,7 +554,7 @@ public class MemoryRecordsBuilderTest {
         }
     }
 
-    @Parameterized.Parameters
+    @Parameterized.Parameters(name = "bufferOffset={0}, compression={1}")
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();
         for (int bufferOffset : Arrays.asList(0, 15))

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index e1d2882..c6fa1ce 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -166,9 +166,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
     if (isShallow)
-      asRecords.batches.asScala.iterator.map(batch => MessageAndOffset.fromRecordBatch(batch.asInstanceOf[AbstractLegacyRecordBatch]))
+      asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch)
     else
-      asRecords.records.asScala.iterator.map(record => MessageAndOffset.fromRecordBatch(record.asInstanceOf[AbstractLegacyRecordBatch]))
+      asRecords.records.asScala.iterator.map(MessageAndOffset.fromRecord)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/core/src/main/scala/kafka/message/MessageAndOffset.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala
index 349e90b..8de0f81 100644
--- a/core/src/main/scala/kafka/message/MessageAndOffset.scala
+++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala
@@ -17,11 +17,29 @@
 
 package kafka.message
 
-import org.apache.kafka.common.record.AbstractLegacyRecordBatch
+import org.apache.kafka.common.record.{AbstractLegacyRecordBatch, Record, RecordBatch}
 
 object MessageAndOffset {
-  def fromRecordBatch(recordBatch: AbstractLegacyRecordBatch): MessageAndOffset = {
-    MessageAndOffset(Message.fromRecord(recordBatch.outerRecord), recordBatch.lastOffset)
+  def fromRecordBatch(batch: RecordBatch): MessageAndOffset = {
+    batch match {
+      case legacyBatch: AbstractLegacyRecordBatch =>
+        MessageAndOffset(Message.fromRecord(legacyBatch.outerRecord), legacyBatch.lastOffset)
+
+      case _ =>
+        throw new IllegalArgumentException(s"Illegal batch type ${batch.getClass}. The older message format classes " +
+          s"only support conversion from ${classOf[AbstractLegacyRecordBatch]}, which is used for magic v0 and v1")
+    }
+  }
+
+  def fromRecord(record: Record): MessageAndOffset = {
+    record match {
+      case legacyBatch: AbstractLegacyRecordBatch =>
+        MessageAndOffset(Message.fromRecord(legacyBatch.outerRecord), legacyBatch.lastOffset)
+
+      case _ =>
+        throw new IllegalArgumentException(s"Illegal record type ${record.getClass}. The older message format classes " +
+          s"only support conversion from ${classOf[AbstractLegacyRecordBatch]}, which is used for magic v0 and v1")
+    }
   }
 }