You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:34 UTC

[7/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index 1bc8a65..4a678d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Utils;
 
@@ -25,51 +26,58 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.Iterator;
 
+/**
+ * An iterator which handles both the shallow and deep iteration of record sets.
+ */
 public class RecordsIterator extends AbstractIterator<LogEntry> {
-    private final LogInputStream logStream;
     private final boolean shallow;
+    private final boolean ensureMatchingMagic;
+    private final int masRecordSize;
+    private final ShallowRecordsIterator<?> shallowIter;
     private DeepRecordsIterator innerIter;
 
-    public RecordsIterator(LogInputStream logStream, boolean shallow) {
-        this.logStream = logStream;
+    public RecordsIterator(LogInputStream<?> logInputStream,
+                           boolean shallow,
+                           boolean ensureMatchingMagic,
+                           int masRecordSize) {
+        this.shallowIter = new ShallowRecordsIterator<>(logInputStream);
         this.shallow = shallow;
+        this.ensureMatchingMagic = ensureMatchingMagic;
+        this.masRecordSize = masRecordSize;
     }
 
-    /*
-     * Read the next record from the buffer.
-     *
-     * Note that in the compressed message set, each message value size is set as the size of the un-compressed
-     * version of the message value, so when we do de-compression allocating an array of the specified size for
-     * reading compressed value data is sufficient.
+    /**
+     * Get a shallow iterator over the given input stream.
+     * @param logInputStream The log input stream to read the entries from
+     * @param <T> The type of the log entry
+     * @return The shallow iterator.
      */
+    public static <T extends LogEntry> Iterator<T> shallowIterator(LogInputStream<T> logInputStream) {
+        return new ShallowRecordsIterator<>(logInputStream);
+    }
+
     @Override
     protected LogEntry makeNext() {
         if (innerDone()) {
-            try {
-                LogEntry entry = logStream.nextEntry();
-                // No more record to return.
-                if (entry == null)
-                    return allDone();
-
-                // decide whether to go shallow or deep iteration if it is compressed
-                CompressionType compressionType = entry.record().compressionType();
-                if (compressionType == CompressionType.NONE || shallow) {
-                    return entry;
-                } else {
-                    // init the inner iterator with the value payload of the message,
-                    // which will de-compress the payload to a set of messages;
-                    // since we assume nested compression is not allowed, the deep iterator
-                    // would not try to further decompress underlying messages
-                    // There will be at least one element in the inner iterator, so we don't
-                    // need to call hasNext() here.
-                    innerIter = new DeepRecordsIterator(entry);
-                    return innerIter.next();
-                }
-            } catch (EOFException e) {
+            if (!shallowIter.hasNext())
                 return allDone();
-            } catch (IOException e) {
-                throw new KafkaException(e);
+
+            LogEntry entry = shallowIter.next();
+
+            // decide whether to go shallow or deep iteration if it is compressed
+            if (shallow || !entry.isCompressed()) {
+                return entry;
+            } else {
+                // init the inner iterator with the value payload of the message,
+                // which will de-compress the payload to a set of messages;
+                // since we assume nested compression is not allowed, the deep iterator
+                // would not try to further decompress underlying messages
+                // There will be at least one element in the inner iterator, so we don't
+                // need to call hasNext() here.
+                innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, masRecordSize);
+                return innerIter.next();
             }
         } else {
             return innerIter.next();
@@ -80,38 +88,70 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
         return innerIter == null || !innerIter.hasNext();
     }
 
-    private static class DataLogInputStream implements LogInputStream {
+    private static class DataLogInputStream implements LogInputStream<LogEntry> {
         private final DataInputStream stream;
+        protected final int maxMessageSize;
 
-        private DataLogInputStream(DataInputStream stream) {
+        DataLogInputStream(DataInputStream stream, int maxMessageSize) {
             this.stream = stream;
+            this.maxMessageSize = maxMessageSize;
         }
 
         public LogEntry nextEntry() throws IOException {
-            long offset = stream.readLong();
-            int size = stream.readInt();
-            if (size < 0)
-                throw new IllegalStateException("Record with size " + size);
-
-            byte[] recordBuffer = new byte[size];
-            stream.readFully(recordBuffer, 0, size);
-            ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
-            return new LogEntry(offset, new Record(buf));
+            try {
+                long offset = stream.readLong();
+                int size = stream.readInt();
+                if (size < Record.RECORD_OVERHEAD_V0)
+                    throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+                if (size > maxMessageSize)
+                    throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+                byte[] recordBuffer = new byte[size];
+                stream.readFully(recordBuffer, 0, size);
+                ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
+                return LogEntry.create(offset, new Record(buf));
+            } catch (EOFException e) {
+                return null;
+            }
         }
     }
 
-    private static class DeepRecordsIterator extends AbstractIterator<LogEntry> {
+    private static class ShallowRecordsIterator<T extends LogEntry> extends AbstractIterator<T> {
+        private final LogInputStream<T> logStream;
+
+        public ShallowRecordsIterator(LogInputStream<T> logStream) {
+            this.logStream = logStream;
+        }
+
+        @Override
+        protected T makeNext() {
+            try {
+                T entry = logStream.nextEntry();
+                if (entry == null)
+                    return allDone();
+                return entry;
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+        }
+    }
+
+    public static class DeepRecordsIterator extends AbstractIterator<LogEntry> {
         private final ArrayDeque<LogEntry> logEntries;
         private final long absoluteBaseOffset;
+        private final byte wrapperMagic;
+
+        public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+            Record wrapperRecord = wrapperEntry.record();
+            this.wrapperMagic = wrapperRecord.magic();
 
-        private DeepRecordsIterator(LogEntry entry) {
-            CompressionType compressionType = entry.record().compressionType();
-            ByteBuffer buffer = entry.record().value();
-            DataInputStream stream = Compressor.wrapForInput(new ByteBufferInputStream(buffer), compressionType, entry.record().magic());
-            LogInputStream logStream = new DataLogInputStream(stream);
+            CompressionType compressionType = wrapperRecord.compressionType();
+            ByteBuffer buffer = wrapperRecord.value();
+            DataInputStream stream = MemoryRecordsBuilder.wrapForInput(new ByteBufferInputStream(buffer), compressionType, wrapperRecord.magic());
+            LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize);
 
-            long wrapperRecordOffset = entry.offset();
-            long wrapperRecordTimestamp = entry.record().timestamp();
+            long wrapperRecordOffset = wrapperEntry.offset();
+            long wrapperRecordTimestamp = wrapperRecord.timestamp();
             this.logEntries = new ArrayDeque<>();
 
             // If relative offset is used, we need to decompress the entire message first to compute
@@ -119,22 +159,27 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
             // do the same for message format version 0
             try {
                 while (true) {
-                    try {
-                        LogEntry logEntry = logStream.nextEntry();
-                        if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
-                            Record recordWithTimestamp = new Record(
-                                    logEntry.record().buffer(),
-                                    wrapperRecordTimestamp,
-                                    entry.record().timestampType()
-                            );
-                            logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
-                        }
-                        logEntries.add(logEntry);
-                    } catch (EOFException e) {
+                    LogEntry logEntry = logStream.nextEntry();
+                    if (logEntry == null)
                         break;
+
+                    Record record = logEntry.record();
+                    byte magic = record.magic();
+
+                    if (ensureMatchingMagic && magic != wrapperMagic)
+                        throw new InvalidRecordException("Compressed message magic does not match wrapper magic");
+
+                    if (magic > Record.MAGIC_VALUE_V0) {
+                        Record recordWithTimestamp = new Record(
+                                record.buffer(),
+                                wrapperRecordTimestamp,
+                                wrapperRecord.timestampType()
+                        );
+                        logEntry = LogEntry.create(logEntry.offset(), recordWithTimestamp);
                     }
+                    logEntries.addLast(logEntry);
                 }
-                if (entry.record().magic() > Record.MAGIC_VALUE_V0)
+                if (wrapperMagic > Record.MAGIC_VALUE_V0)
                     this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
                 else
                     this.absoluteBaseOffset = -1;
@@ -155,12 +200,10 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
             // Convert offset to absolute offset if needed.
             if (absoluteBaseOffset >= 0) {
                 long absoluteOffset = absoluteBaseOffset + entry.offset();
-                entry = new LogEntry(absoluteOffset, entry.record());
+                entry = LogEntry.create(absoluteOffset, entry.record());
             }
 
-            // decide whether to go shallow or deep iteration if it is compressed
-            CompressionType compression = entry.record().compressionType();
-            if (compression != CompressionType.NONE)
+            if (entry.isCompressed())
                 throw new InvalidRecordException("Inner messages must not be compressed");
 
             return entry;

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
index 62fd814..55c966a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
@@ -27,6 +27,7 @@ public enum TimestampType {
 
     public final int id;
     public final String name;
+
     TimestampType(int id, String name) {
         this.id = id;
         this.name = name;

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index c3c1045..c5e6716 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -266,6 +266,24 @@ public class Utils {
     }
 
     /**
+     * Convert a ByteBuffer to a nullable array.
+     * @param buffer The buffer to convert
+     * @return The resulting array or null if the buffer is null
+     */
+    public static byte[] toNullableArray(ByteBuffer buffer) {
+        return buffer == null ? null : toArray(buffer);
+    }
+
+    /**
+     * Wrap an array as a nullable ByteBuffer.
+     * @param array The nullable array to wrap
+     * @return The wrapping ByteBuffer or null if array is null
+     */
+    public static ByteBuffer wrapNullable(byte[] array) {
+        return array == null ? null : ByteBuffer.wrap(array);
+    }
+
+    /**
      * Read a byte array from the given offset and size in the buffer
      */
     public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
@@ -733,4 +751,37 @@ public class Utils {
     public static int longHashcode(long value) {
         return (int) (value ^ (value >>> 32));
     }
+
+    /**
+     * Read a size-delimited byte buffer starting at the given offset.
+     * @param buffer Buffer containing the size and data
+     * @param start Offset in the buffer to read from
+     * @return A slice of the buffer containing only the delimited data (excluding the size)
+     */
+    public static ByteBuffer sizeDelimited(ByteBuffer buffer, int start) {
+        int size = buffer.getInt(start);
+        if (size < 0) {
+            return null;
+        } else {
+            ByteBuffer b = buffer.duplicate();
+            b.position(start + 4);
+            b = b.slice();
+            b.limit(size);
+            b.rewind();
+            return b;
+        }
+    }
+
+    /**
+     * Compute the checksum of a range of data
+     * @param buffer Buffer containing the data to checksum
+     * @param start Offset in the buffer to read from
+     * @param size The number of bytes to include
+     */
+    public static long computeChecksum(ByteBuffer buffer, int start, int size) {
+        Crc32 crc = new Crc32();
+        crc.update(buffer.array(), buffer.arrayOffset() + start, size);
+        return crc.getValue();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ad6c127..a4386f8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -39,6 +39,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchResponse.PartitionData;
@@ -1323,11 +1325,10 @@ public class KafkaConsumerTest {
             TopicPartition partition = fetchEntry.getKey();
             long fetchOffset = fetchEntry.getValue().offset;
             int fetchCount = fetchEntry.getValue().count;
-            MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+            MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
             for (int i = 0; i < fetchCount; i++)
                 records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
-            records.close();
-            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records));
+            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build()));
         }
         return new FetchResponse(tpResponses, 0);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6d5896f..15075cb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -37,10 +37,12 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ByteBufferOutputStream;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Compressor;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
@@ -93,8 +95,8 @@ public class FetcherTest {
     private static final double EPSILON = 0.0001;
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
 
-    private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
-    private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+    private MemoryRecords records;
+    private MemoryRecords nextRecords;
     private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics);
     private Metrics fetcherMetrics = new Metrics(time);
     private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
@@ -104,14 +106,16 @@ public class FetcherTest {
         metadata.update(cluster, time.milliseconds());
         client.setNode(node);
 
-        records.append(1L, 0L, "key".getBytes(), "value-1".getBytes());
-        records.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
-        records.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
-        records.close();
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+        builder.append(1L, 0L, "key".getBytes(), "value-1".getBytes());
+        builder.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
+        builder.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
+        records = builder.build();
 
-        nextRecords.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
-        nextRecords.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
-        nextRecords.close();
+        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+        builder.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
+        builder.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
+        nextRecords = builder.build();
     }
 
     @After
@@ -129,7 +133,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -154,7 +158,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -192,7 +196,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 1);
 
-        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -206,29 +210,30 @@ public class FetcherTest {
     }
 
     @Test
-    public void testParseInvalidRecord() {
+    public void testParseInvalidRecord() throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
-        Compressor compressor = new Compressor(buffer, CompressionType.NONE);
+        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
 
+        byte magic = Record.CURRENT_MAGIC_VALUE;
         byte[] key = "foo".getBytes();
         byte[] value = "baz".getBytes();
         long offset = 0;
         long timestamp = 500L;
 
         int size = Record.recordSize(key, value);
-        long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1);
+        byte attributes = Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
+        long crc = Record.computeChecksum(magic, attributes, timestamp, key, value);
 
         // write one valid record
-        compressor.putLong(offset);
-        compressor.putInt(size);
-        Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+        out.writeLong(offset);
+        out.writeInt(size);
+        Record.write(out, magic, crc, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
 
         // and one invalid record (note the crc)
-        compressor.putLong(offset);
-        compressor.putInt(size);
-        Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+        out.writeLong(offset);
+        out.writeInt(size);
+        Record.write(out, magic, crc + 1, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
 
-        compressor.close();
         buffer.flip();
 
         subscriptions.assignFromUser(singleton(tp));
@@ -236,7 +241,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -255,8 +260,8 @@ public class FetcherTest {
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 1);
 
-        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
-        client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE.code(), 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -287,11 +292,11 @@ public class FetcherTest {
         // if we are fetching from a compacted topic, there may be gaps in the returned records
         // this test verifies the fetcher updates the current fetched/consumed positions correctly for this case
 
-        MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
-        records.append(15L, 0L, "key".getBytes(), "value-1".getBytes());
-        records.append(20L, 0L, "key".getBytes(), "value-2".getBytes());
-        records.append(30L, 0L, "key".getBytes(), "value-3".getBytes());
-        records.close();
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+        builder.append(15L, 0L, "key".getBytes(), "value-1".getBytes());
+        builder.append(20L, 0L, "key".getBytes(), "value-2".getBytes());
+        builder.append(30L, 0L, "key".getBytes(), "value-3".getBytes());
+        MemoryRecords records = builder.build();
 
         List<ConsumerRecord<byte[], byte[]>> consumerRecords;
         subscriptions.assignFromUser(singleton(tp));
@@ -299,7 +304,7 @@ public class FetcherTest {
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(records, Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp);
         assertEquals(3, consumerRecords.size());
@@ -317,7 +322,7 @@ public class FetcherTest {
 
         // resize the limit of the buffer to pretend it is only fetch-size large
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -337,7 +342,7 @@ public class FetcherTest {
 
         // Now the rebalance happens and fetch positions are cleared
         subscriptions.assignFromSubscribed(singleton(tp));
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
 
         // The active fetch should be ignored since its position is no longer valid
@@ -352,7 +357,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         subscriptions.pause(tp);
 
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         assertNull(fetcher.fetchedRecords().get(tp));
     }
@@ -373,7 +378,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -385,7 +390,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -397,7 +402,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
@@ -412,7 +417,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         subscriptions.seek(tp, 1);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -426,7 +431,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.seek(tp, 0);
 
         assertTrue(fetcherNoAutoReset.sendFetches() > 0);
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
         subscriptionsNoAutoReset.seek(tp, 2);
@@ -439,7 +444,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.seek(tp, 0);
 
         fetcherNoAutoReset.sendFetches();
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+        client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
 
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -459,7 +464,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
 
@@ -611,14 +616,14 @@ public class FetcherTest {
             // We need to make sure the message offset grows. Otherwise they will be considered as already consumed
             // and filtered out by consumer.
             if (i > 1) {
-                this.records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+                MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
                 for (int v = 0; v < 3; v++) {
-                    this.records.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
+                    builder.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
                 }
-                this.records.close();
+                this.records = builder.build();
             }
             assertEquals(1, fetcher.sendFetches());
-            client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
+            client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 100 * i));
             consumerClient.poll(0);
             records = fetcher.fetchedRecords().get(tp);
             assertEquals(3, records.size());
@@ -722,8 +727,7 @@ public class FetcherTest {
         return new ListOffsetResponse(allPartitionData, 1);
     }
 
-    private FetchResponse fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
-        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+    private FetchResponse fetchResponse(MemoryRecords records, short error, long hw, int throttleTime) {
         return new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records)), throttleTime);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 28521e8..4f25bdf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -12,23 +12,6 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
@@ -45,6 +28,23 @@ import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class RecordAccumulatorTest {
 
     private String topic = "test";
@@ -84,7 +84,7 @@ public class RecordAccumulatorTest {
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
             Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
             assertEquals(1, partitionBatches.size());
-            assertTrue(partitionBatches.peekFirst().records.isWritable());
+            assertTrue(partitionBatches.peekFirst().isWritable());
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
 
@@ -93,15 +93,15 @@ public class RecordAccumulatorTest {
         Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
         assertEquals(2, partitionBatches.size());
         Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator();
-        assertFalse(partitionBatchesIterator.next().records.isWritable());
-        assertTrue(partitionBatchesIterator.next().records.isWritable());
+        assertFalse(partitionBatchesIterator.next().isWritable());
+        assertTrue(partitionBatchesIterator.next().isWritable());
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records.iterator();
+        Iterator<LogEntry> iter = batch.records().deepIterator();
         for (int i = 0; i < appends; i++) {
             LogEntry entry = iter.next();
             assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -130,7 +130,7 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records.iterator();
+        Iterator<LogEntry> iter = batch.records().deepIterator();
         LogEntry entry = iter.next();
         assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
         assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
@@ -159,7 +159,7 @@ public class RecordAccumulatorTest {
         final int msgs = 10000;
         final int numParts = 2;
         final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
-        List<Thread> threads = new ArrayList<Thread>();
+        List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
                 public void run() {
@@ -182,8 +182,11 @@ public class RecordAccumulatorTest {
             List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {
-                    for (LogEntry entry : batch.records)
+                    Iterator<LogEntry> deepEntries = batch.records().deepIterator();
+                    while (deepEntries.hasNext()) {
+                        deepEntries.next();
                         read++;
+                    }
                     accum.deallocate(batch);
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
new file mode 100644
index 0000000..62e8a05
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ByteBufferLogInputStreamTest {
+
+    @Test
+    public void iteratorIgnoresIncompleteEntries() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        builder.append(1L, 20L, "b".getBytes(), "2".getBytes());
+
+        ByteBuffer recordsBuffer = builder.build().buffer();
+        recordsBuffer.limit(recordsBuffer.limit() - 5);
+
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowIterator();
+        assertTrue(iterator.hasNext());
+        ByteBufferLogInputStream.ByteBufferLogEntry first = iterator.next();
+        assertEquals(0L, first.offset());
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testSetCreateTimeV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+        assertTrue(iterator.hasNext());
+        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+        long createTimeMs = 20L;
+        entry.setCreateTime(createTimeMs);
+
+        assertEquals(TimestampType.CREATE_TIME, entry.record().timestampType());
+        assertEquals(createTimeMs, entry.record().timestamp());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetCreateTimeNotAllowedV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+        assertTrue(iterator.hasNext());
+        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+        long createTimeMs = 20L;
+        entry.setCreateTime(createTimeMs);
+    }
+
+    @Test
+    public void testSetLogAppendTimeV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+        assertTrue(iterator.hasNext());
+        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+        long logAppendTime = 20L;
+        entry.setLogAppendTime(logAppendTime);
+
+        assertEquals(TimestampType.LOG_APPEND_TIME, entry.record().timestampType());
+        assertEquals(logAppendTime, entry.record().timestamp());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetLogAppendTimeNotAllowedV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+        assertTrue(iterator.hasNext());
+        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+        long logAppendTime = 20L;
+        entry.setLogAppendTime(logAppendTime);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
new file mode 100644
index 0000000..7e2c256
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FileRecordsTest {
+
+    private Record[] records = new Record[] {
+            Record.create("abcd".getBytes()),
+            Record.create("efgh".getBytes()),
+            Record.create("ijkl".getBytes())
+    };
+    private FileRecords fileRecords;
+
+    @Before
+    public void setup() throws IOException {
+        this.fileRecords = createFileRecords(records);
+    }
+
+    /**
+     * Test that the cached size variable matches the actual file size as we append messages
+     */
+    @Test
+    public void testFileSize() throws IOException {
+        assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
+        for (int i = 0; i < 20; i++) {
+            fileRecords.append(MemoryRecords.withRecords(Record.create("abcd".getBytes())));
+            assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
+        }
+    }
+
+    /**
+     * Test that adding invalid bytes to the end of the log doesn't break iteration
+     */
+    @Test
+    public void testIterationOverPartialAndTruncation() throws IOException {
+        testPartialWrite(0, fileRecords);
+        testPartialWrite(2, fileRecords);
+        testPartialWrite(4, fileRecords);
+        testPartialWrite(5, fileRecords);
+        testPartialWrite(6, fileRecords);
+    }
+
+    private void testPartialWrite(int size, FileRecords fileRecords) throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        for (int i = 0; i < size; i++)
+            buffer.put((byte) 0);
+
+        buffer.rewind();
+
+        fileRecords.channel().write(buffer);
+
+        // appending those bytes should not change the contents
+        TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+    }
+
+    /**
+     * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel.
+     */
+    @Test
+    public void testIterationDoesntChangePosition() throws IOException {
+        long position = fileRecords.channel().position();
+        TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+        assertEquals(position, fileRecords.channel().position());
+    }
+
+    /**
+     * Test a simple append and read.
+     */
+    @Test
+    public void testRead() throws IOException {
+        FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
+        TestUtils.checkEquals(fileRecords.shallowIterator(), read.shallowIterator());
+
+        List<LogEntry> items = shallowEntries(read);
+        LogEntry second = items.get(1);
+
+        read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
+        assertEquals("Try a read starting from the second message",
+                items.subList(1, 3), shallowEntries(read));
+
+        read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
+        assertEquals("Try a read of a single message starting from the second message",
+                Collections.singletonList(second), shallowEntries(read));
+    }
+
+    /**
+     * Test the MessageSet.searchFor API.
+     */
+    @Test
+    public void testSearch() throws IOException {
+        // append a new message with a high offset
+        Record lastMessage = Record.create("test".getBytes());
+        fileRecords.append(MemoryRecords.withRecords(50L, lastMessage));
+
+        List<LogEntry> entries = shallowEntries(fileRecords);
+        int position = 0;
+
+        int message1Size = entries.get(0).sizeInBytes();
+        assertEquals("Should be able to find the first message by its offset",
+                new FileRecords.LogEntryPosition(0L, position, message1Size),
+                fileRecords.searchForOffsetWithSize(0, 0));
+        position += message1Size;
+
+        int message2Size = entries.get(1).sizeInBytes();
+        assertEquals("Should be able to find second message when starting from 0",
+                new FileRecords.LogEntryPosition(1L, position, message2Size),
+                fileRecords.searchForOffsetWithSize(1, 0));
+        assertEquals("Should be able to find second message starting from its offset",
+                new FileRecords.LogEntryPosition(1L, position, message2Size),
+                fileRecords.searchForOffsetWithSize(1, position));
+        position += message2Size + entries.get(2).sizeInBytes();
+
+        int message4Size = entries.get(3).sizeInBytes();
+        assertEquals("Should be able to find fourth message from a non-existant offset",
+                new FileRecords.LogEntryPosition(50L, position, message4Size),
+                fileRecords.searchForOffsetWithSize(3, position));
+        assertEquals("Should be able to find fourth message by correct offset",
+                new FileRecords.LogEntryPosition(50L, position, message4Size),
+                fileRecords.searchForOffsetWithSize(50,  position));
+    }
+
+    /**
+     * Test that the message set iterator obeys start and end slicing
+     */
+    @Test
+    public void testIteratorWithLimits() throws IOException {
+        LogEntry entry = shallowEntries(fileRecords).get(1);
+        int start = fileRecords.searchForOffsetWithSize(1, 0).position;
+        int size = entry.sizeInBytes();
+        FileRecords slice = fileRecords.read(start, size);
+        assertEquals(Collections.singletonList(entry), shallowEntries(slice));
+        FileRecords slice2 = fileRecords.read(start, size - 1);
+        assertEquals(Collections.emptyList(), shallowEntries(slice2));
+    }
+
+    /**
+     * Test the truncateTo method lops off messages and appropriately updates the size
+     */
+    @Test
+    public void testTruncate() throws IOException {
+        LogEntry entry = shallowEntries(fileRecords).get(0);
+        int end = fileRecords.searchForOffsetWithSize(1, 0).position;
+        fileRecords.truncateTo(end);
+        assertEquals(Collections.singletonList(entry), shallowEntries(fileRecords));
+        assertEquals(entry.sizeInBytes(), fileRecords.sizeInBytes());
+    }
+
+    /**
+     * Test that truncateTo only calls truncate on the FileChannel if the size of the
+     * FileChannel is bigger than the target size. This is important because some JVMs
+     * change the mtime of the file, even if truncate should do nothing.
+     */
+    @Test
+    public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException {
+        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+
+        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
+        EasyMock.expect(channelMock.position(42L)).andReturn(null);
+        EasyMock.replay(channelMock);
+
+        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
+        fileRecords.truncateTo(42);
+
+        EasyMock.verify(channelMock);
+    }
+
+    /**
+     * Expect a KafkaException if targetSize is bigger than the size of
+     * the FileRecords.
+     */
+    @Test
+    public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOException {
+        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+
+        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
+        EasyMock.expect(channelMock.position(42L)).andReturn(null);
+        EasyMock.replay(channelMock);
+
+        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
+
+        try {
+            fileRecords.truncateTo(43);
+            fail("Should throw KafkaException");
+        } catch (KafkaException e) {
+            // expected
+        }
+
+        EasyMock.verify(channelMock);
+    }
+
+    /**
+     * see #testTruncateNotCalledIfSizeIsSameAsTargetSize
+     */
+    @Test
+    public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException {
+        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+
+        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
+        EasyMock.expect(channelMock.position(42L)).andReturn(null).once();
+        EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once();
+        EasyMock.expect(channelMock.position(23L)).andReturn(null).once();
+        EasyMock.replay(channelMock);
+
+        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
+        fileRecords.truncateTo(23);
+
+        EasyMock.verify(channelMock);
+    }
+
+    /**
+     * Test the new FileRecords with pre allocate as true
+     */
+    @Test
+    public void testPreallocateTrue() throws IOException {
+        File temp = tempFile();
+        FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+        long position = fileRecords.channel().position();
+        int size = fileRecords.sizeInBytes();
+        assertEquals(0, position);
+        assertEquals(0, size);
+        assertEquals(512 * 1024 * 1024, temp.length());
+    }
+
+    /**
+     * Test the new FileRecords with pre allocate as false
+     */
+    @Test
+    public void testPreallocateFalse() throws IOException {
+        File temp = tempFile();
+        FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, false);
+        long position = set.channel().position();
+        int size = set.sizeInBytes();
+        assertEquals(0, position);
+        assertEquals(0, size);
+        assertEquals(0, temp.length());
+    }
+
+    /**
+     * Test the new FileRecords with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data.
+     */
+    @Test
+    public void testPreallocateClearShutdown() throws IOException {
+        File temp = tempFile();
+        FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+        set.append(MemoryRecords.withRecords(records));
+
+        int oldPosition = (int) set.channel().position();
+        int oldSize = set.sizeInBytes();
+        assertEquals(fileRecords.sizeInBytes(), oldPosition);
+        assertEquals(fileRecords.sizeInBytes(), oldSize);
+        set.close();
+
+        File tempReopen = new File(temp.getAbsolutePath());
+        FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 * 1024, true);
+        int position = (int) setReopen.channel().position();
+        int size = setReopen.sizeInBytes();
+
+        assertEquals(oldPosition, position);
+        assertEquals(oldPosition, size);
+        assertEquals(oldPosition, tempReopen.length());
+    }
+
+    @Test
+    public void testFormatConversionWithPartialMessage() throws IOException {
+        LogEntry entry = shallowEntries(fileRecords).get(1);
+        int start = fileRecords.searchForOffsetWithSize(1, 0).position;
+        int size = entry.sizeInBytes();
+        FileRecords slice = fileRecords.read(start, size - 1);
+        Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0);
+        assertTrue("No message should be there", shallowEntries(messageV0).isEmpty());
+        assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
+    }
+
+    @Test
+    public void testConvertNonCompressedToMagic1() throws IOException {
+        List<LogEntry> entries = Arrays.asList(
+                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
+                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
+        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
+
+        // Up conversion. In reality we only do down conversion, but up conversion should work as well.
+        // up conversion for non-compressed messages
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            fileRecords.append(records);
+            fileRecords.flush();
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
+            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
+        }
+    }
+
+    @Test
+    public void testConvertCompressedToMagic1() throws IOException {
+        List<LogEntry> entries = Arrays.asList(
+                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
+                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
+        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
+
+        // up conversion for compressed messages
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            fileRecords.append(records);
+            fileRecords.flush();
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
+            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
+        }
+    }
+
+    @Test
+    public void testConvertNonCompressedToMagic0() throws IOException {
+        List<LogEntry> entries = Arrays.asList(
+                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
+                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
+        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
+
+        // down conversion for non-compressed messages
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            fileRecords.append(records);
+            fileRecords.flush();
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
+            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
+        }
+    }
+
+    @Test
+    public void testConvertCompressedToMagic0() throws IOException {
+        List<LogEntry> entries = Arrays.asList(
+                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
+                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
+        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
+
+        // down conversion for compressed messages
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            fileRecords.append(records);
+            fileRecords.flush();
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
+            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
+        }
+    }
+
+    private void verifyConvertedMessageSet(List<LogEntry> initialEntries, Records convertedRecords, byte magicByte) {
+        int i = 0;
+        for (LogEntry logEntry : deepEntries(convertedRecords)) {
+            assertEquals("magic byte should be " + magicByte, magicByte, logEntry.record().magic());
+            assertEquals("offset should not change", initialEntries.get(i).offset(), logEntry.offset());
+            assertEquals("key should not change", initialEntries.get(i).record().key(), logEntry.record().key());
+            assertEquals("payload should not change", initialEntries.get(i).record().value(), logEntry.record().value());
+            i += 1;
+        }
+    }
+
+    private static List<LogEntry> shallowEntries(Records buffer) {
+        List<LogEntry> entries = new ArrayList<>();
+        Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
+        while (iterator.hasNext())
+            entries.add(iterator.next());
+        return entries;
+    }
+
+    private static List<LogEntry> deepEntries(Records buffer) {
+        List<LogEntry> entries = new ArrayList<>();
+        Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
+        while (iterator.hasNext()) {
+            for (LogEntry deepEntry : iterator.next())
+                entries.add(deepEntry);
+        }
+        return entries;
+    }
+
+    private FileRecords createFileRecords(Record ... records) throws IOException {
+        FileRecords fileRecords = FileRecords.open(tempFile());
+        fileRecords.append(MemoryRecords.withRecords(records));
+        fileRecords.flush();
+        return fileRecords;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
new file mode 100644
index 0000000..40fa212
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@RunWith(value = Parameterized.class)
+public class MemoryRecordsBuilderTest {
+
+    private final CompressionType compressionType;
+    private final int bufferOffset;
+
+    public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
+        this.bufferOffset = bufferOffset;
+        this.compressionType = compressionType;
+    }
+
+    @Test
+    public void testCompressionRateV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        Record[] records = new Record[] {
+                Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
+                Record.create(Record.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
+                Record.create(Record.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
+        };
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType,
+                TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+
+        int uncompressedSize = 0;
+        long offset = 0L;
+        for (Record record : records) {
+            uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
+            builder.append(offset++, record);
+        }
+
+        MemoryRecords built = builder.build();
+        if (compressionType == CompressionType.NONE) {
+            assertEquals(1.0, builder.compressionRate(), 0.00001);
+        } else {
+            int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V0;
+            double computedCompressionRate = (double) compressedSize / uncompressedSize;
+            assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+        }
+    }
+
+    @Test
+    public void testCompressionRateV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        Record[] records = new Record[] {
+                Record.create(Record.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
+                Record.create(Record.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
+                Record.create(Record.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
+        };
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+
+        int uncompressedSize = 0;
+        long offset = 0L;
+        for (Record record : records) {
+            uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
+            builder.append(offset++, record);
+        }
+
+        MemoryRecords built = builder.build();
+        if (compressionType == CompressionType.NONE) {
+            assertEquals(1.0, builder.compressionRate(), 0.00001);
+        } else {
+            int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V1;
+            double computedCompressionRate = (double) compressedSize / uncompressedSize;
+            assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+        }
+    }
+
+    @Test
+    public void buildUsingLogAppendTime() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        long logAppendTime = System.currentTimeMillis();
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
+        builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
+        builder.append(1L, 0L, "b".getBytes(), "2".getBytes());
+        builder.append(2L, 0L, "c".getBytes(), "3".getBytes());
+        MemoryRecords records = builder.build();
+
+        MemoryRecordsBuilder.RecordsInfo info = builder.info();
+        assertEquals(logAppendTime, info.maxTimestamp);
+
+        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+        Iterator<Record> iterator = records.records();
+        while (iterator.hasNext()) {
+            Record record = iterator.next();
+            assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
+            assertEquals(logAppendTime, record.timestamp());
+        }
+    }
+
+    @Test
+    public void convertUsingLogAppendTime() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        long logAppendTime = System.currentTimeMillis();
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
+
+        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
+        builder.convertAndAppend(1L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
+        builder.convertAndAppend(2L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
+        MemoryRecords records = builder.build();
+
+        MemoryRecordsBuilder.RecordsInfo info = builder.info();
+        assertEquals(logAppendTime, info.maxTimestamp);
+
+        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+        Iterator<Record> iterator = records.records();
+        while (iterator.hasNext()) {
+            Record record = iterator.next();
+            assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
+            assertEquals(logAppendTime, record.timestamp());
+        }
+    }
+
+    @Test
+    public void buildUsingCreateTime() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        long logAppendTime = System.currentTimeMillis();
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+        builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
+        builder.append(1L, 2L, "b".getBytes(), "2".getBytes());
+        builder.append(2L, 1L, "c".getBytes(), "3".getBytes());
+        MemoryRecords records = builder.build();
+
+        MemoryRecordsBuilder.RecordsInfo info = builder.info();
+        assertEquals(2L, info.maxTimestamp);
+
+        if (compressionType == CompressionType.NONE)
+            assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
+        else
+            assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+        Iterator<Record> iterator = records.records();
+        int i = 0;
+        long[] expectedTimestamps = new long[] {0L, 2L, 1L};
+        while (iterator.hasNext()) {
+            Record record = iterator.next();
+            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+            assertEquals(expectedTimestamps[i++], record.timestamp());
+        }
+    }
+
+    @Test
+    public void writePastLimit() {
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.position(bufferOffset);
+
+        long logAppendTime = System.currentTimeMillis();
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+        builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
+        builder.append(1L, 1L, "b".getBytes(), "2".getBytes());
+
+        assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes()));
+        builder.append(2L, 2L, "c".getBytes(), "3".getBytes());
+        MemoryRecords records = builder.build();
+
+        MemoryRecordsBuilder.RecordsInfo info = builder.info();
+        assertEquals(2L, info.maxTimestamp);
+        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+        Iterator<Record> iterator = records.records();
+        long i = 0L;
+        while (iterator.hasNext()) {
+            Record record = iterator.next();
+            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+            assertEquals(i++, record.timestamp());
+        }
+    }
+
+    @Test
+    public void convertUsingCreateTime() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        long logAppendTime = System.currentTimeMillis();
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+
+        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
+        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
+        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
+        MemoryRecords records = builder.build();
+
+        MemoryRecordsBuilder.RecordsInfo info = builder.info();
+        assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
+        assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
+
+        Iterator<Record> iterator = records.records();
+        while (iterator.hasNext()) {
+            Record record = iterator.next();
+            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+            assertEquals(Record.NO_TIMESTAMP, record.timestamp());
+        }
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+        for (int bufferOffset : Arrays.asList(0, 15))
+            for (CompressionType compressionType : CompressionType.values())
+                values.add(new Object[] {bufferOffset, compressionType});
+        return values;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index b1117f1..ef0fbeb 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -16,53 +16,64 @@
  */
 package org.apache.kafka.common.record;
 
-import static org.apache.kafka.common.utils.Utils.toArray;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.toNullableArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
 public class MemoryRecordsTest {
 
     private CompressionType compression;
+    private byte magic;
+    private long firstOffset;
 
-    public MemoryRecordsTest(CompressionType compression) {
+    public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compression) {
+        this.magic = magic;
         this.compression = compression;
+        this.firstOffset = firstOffset;
     }
 
     @Test
     public void testIterator() {
-        MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
-        MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
-        List<Record> list = Arrays.asList(new Record(0L, "a".getBytes(), "1".getBytes()),
-                                          new Record(0L, "b".getBytes(), "2".getBytes()),
-                                          new Record(0L, "c".getBytes(), "3".getBytes()));
+        MemoryRecordsBuilder builder1 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
+        MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
+        List<Record> list = asList(
+                Record.create(magic, 1L, "a".getBytes(), "1".getBytes()),
+                Record.create(magic, 2L, "b".getBytes(), "2".getBytes()),
+                Record.create(magic, 3L, "c".getBytes(), "3".getBytes()),
+                Record.create(magic, 4L, null, "4".getBytes()),
+                Record.create(magic, 5L, "e".getBytes(), null),
+                Record.create(magic, 6L, null, null));
+
         for (int i = 0; i < list.size(); i++) {
             Record r = list.get(i);
-            recs1.append(i, r);
-            recs2.append(i, 0L, toArray(r.key()), toArray(r.value()));
+            builder1.append(firstOffset + i, r);
+            builder2.append(firstOffset + i, i + 1, toNullableArray(r.key()), toNullableArray(r.value()));
         }
-        recs1.close();
-        recs2.close();
+
+        MemoryRecords recs1 = builder1.build();
+        MemoryRecords recs2 = builder2.build();
 
         for (int iteration = 0; iteration < 2; iteration++) {
-            for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
-                Iterator<LogEntry> iter = recs.iterator();
+            for (MemoryRecords recs : asList(recs1, recs2)) {
+                Iterator<LogEntry> iter = recs.deepIterator();
                 for (int i = 0; i < list.size(); i++) {
                     assertTrue(iter.hasNext());
                     LogEntry entry = iter.next();
-                    assertEquals((long) i, entry.offset());
+                    assertEquals(firstOffset + i, entry.offset());
                     assertEquals(list.get(i), entry.record());
                     entry.record().ensureValid();
                 }
@@ -73,20 +84,145 @@ public class MemoryRecordsTest {
 
     @Test
     public void testHasRoomForMethod() {
-        MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
-        recs1.append(0, new Record(0L, "a".getBytes(), "1".getBytes()));
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME);
+        builder.append(0, Record.create(magic, 0L, "a".getBytes(), "1".getBytes()));
+
+        assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+        builder.close();
+        assertFalse(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+    }
+
+    @Test
+    public void testFilterTo() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME);
+        builder.append(0L, 10L, null, "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
+        builder.append(1L, 11L, "1".getBytes(), "b".getBytes());
+        builder.append(2L, 12L, null, "c".getBytes());
+        builder.close();
 
-        assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
-        recs1.close();
-        assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L);
+        builder.append(3L, 13L, null, "d".getBytes());
+        builder.append(4L, 20L, "4".getBytes(), "e".getBytes());
+        builder.append(5L, 15L, "5".getBytes(), "f".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L);
+        builder.append(6L, 16L, "6".getBytes(), "g".getBytes());
+        builder.close();
+
+        buffer.flip();
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+
+        filtered.flip();
+
+        assertEquals(7, result.messagesRead);
+        assertEquals(4, result.messagesRetained);
+        assertEquals(buffer.limit(), result.bytesRead);
+        assertEquals(filtered.limit(), result.bytesRetained);
+        if (magic > 0) {
+            assertEquals(20L, result.maxTimestamp);
+            if (compression == CompressionType.NONE)
+                assertEquals(4L, result.shallowOffsetOfMaxTimestamp);
+            else
+                assertEquals(5L, result.shallowOffsetOfMaxTimestamp);
+        }
 
+        MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+        List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
+        assertEquals(expectedOffsets.size(), shallowEntries.size());
+
+        for (int i = 0; i < expectedOffsets.size(); i++) {
+            LogEntry shallowEntry = shallowEntries.get(i);
+            assertEquals(expectedOffsets.get(i).longValue(), shallowEntry.offset());
+            assertEquals(magic, shallowEntry.record().magic());
+            assertEquals(compression, shallowEntry.record().compressionType());
+            assertEquals(magic == Record.MAGIC_VALUE_V0 ? TimestampType.NO_TIMESTAMP_TYPE : TimestampType.CREATE_TIME,
+                    shallowEntry.record().timestampType());
+        }
+
+        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepIterator());
+        assertEquals(4, deepEntries.size());
+
+        LogEntry first = deepEntries.get(0);
+        assertEquals(1L, first.offset());
+        assertEquals(Record.create(magic, 11L, "1".getBytes(), "b".getBytes()), first.record());
+
+        LogEntry second = deepEntries.get(1);
+        assertEquals(4L, second.offset());
+        assertEquals(Record.create(magic, 20L, "4".getBytes(), "e".getBytes()), second.record());
+
+        LogEntry third = deepEntries.get(2);
+        assertEquals(5L, third.offset());
+        assertEquals(Record.create(magic, 15L, "5".getBytes(), "f".getBytes()), third.record());
+
+        LogEntry fourth = deepEntries.get(3);
+        assertEquals(6L, fourth.offset());
+        assertEquals(Record.create(magic, 16L, "6".getBytes(), "g".getBytes()), fourth.record());
+    }
+
+    @Test
+    public void testFilterToPreservesLogAppendTime() {
+        long logAppendTime = System.currentTimeMillis();
+
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
+                TimestampType.LOG_APPEND_TIME, 0L, logAppendTime);
+        builder.append(0L, 10L, null, "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime);
+        builder.append(1L, 11L, "1".getBytes(), "b".getBytes());
+        builder.append(2L, 12L, null, "c".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime);
+        builder.append(3L, 13L, null, "d".getBytes());
+        builder.append(4L, 14L, "4".getBytes(), "e".getBytes());
+        builder.append(5L, 15L, "5".getBytes(), "f".getBytes());
+        builder.close();
+
+        buffer.flip();
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+
+        filtered.flip();
+        MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+        assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
+
+        for (LogEntry shallowEntry : shallowEntries) {
+            assertEquals(compression, shallowEntry.record().compressionType());
+            if (magic > Record.MAGIC_VALUE_V0) {
+                assertEquals(TimestampType.LOG_APPEND_TIME, shallowEntry.record().timestampType());
+                assertEquals(logAppendTime, shallowEntry.record().timestamp());
+            }
+        }
     }
 
     @Parameterized.Parameters
     public static Collection<Object[]> data() {
-        List<Object[]> values = new ArrayList<Object[]>();
-        for (CompressionType type: CompressionType.values())
-            values.add(new Object[] {type});
+        List<Object[]> values = new ArrayList<>();
+        for (long firstOffset : asList(0L, 57L))
+            for (byte magic : asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
+                for (CompressionType type: CompressionType.values())
+                    values.add(new Object[] {magic, firstOffset, type});
         return values;
     }
+
+    private static class RetainNonNullKeysFilter implements MemoryRecords.LogEntryFilter {
+        @Override
+        public boolean shouldRetain(LogEntry entry) {
+            return entry.record().hasKey();
+        }
+    }
 }