You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:35 UTC

[8/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
index 4a4d569..a9af651 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
@@ -20,10 +20,14 @@ import java.io.IOException;
 
 /**
  * An abstraction between an underlying input stream and record iterators, a LogInputStream
- * returns only the shallow log entries, depending on {@link org.apache.kafka.common.record.RecordsIterator.DeepRecordsIterator}
- * for the deep iteration.
+ * returns only the shallow log entries, depending on {@link RecordsIterator.DeepRecordsIterator}
+ * for the deep iteration. The generic typing allows for implementations which present only
+ * a view of the log entries, which enables more efficient iteration when the record data is
+ * not actually needed. See for example {@link org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry}
+ * in which the record is not brought into memory until needed.
+ * @param <T> Type parameter of the log entry
  */
-interface LogInputStream {
+interface LogInputStream<T extends LogEntry> {
 
     /**
      * Get the next log entry from the underlying input stream.
@@ -31,5 +35,5 @@ interface LogInputStream {
      * @return The next log entry or null if there is none
      * @throws IOException for any IO errors
      */
-    LogEntry nextEntry() throws IOException;
+    T nextEntry() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 65ccf98..b945062 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -12,197 +12,185 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.DataInputStream;
+import org.apache.kafka.common.record.ByteBufferLogInputStream.ByteBufferLogEntry;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 
 /**
- * A {@link Records} implementation backed by a ByteBuffer.
+ * A {@link Records} implementation backed by a ByteBuffer. This is used only for reading or
+ * modifying in-place an existing buffer of log entries. To create a new buffer see {@link MemoryRecordsBuilder},
+ * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType) builder} variants.
  */
-public class MemoryRecords implements Records {
+public class MemoryRecords extends AbstractRecords {
 
     public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
 
-    private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
-
-    // the compressor used for appends-only
-    private final Compressor compressor;
-
-    // the write limit for writable buffer, which may be smaller than the buffer capacity
-    private final int writeLimit;
-
-    // the capacity of the initial buffer, which is only used for de-allocation of writable records
-    private final int initialCapacity;
-
     // the underlying buffer used for read; while the records are still writable it is null
     private ByteBuffer buffer;
-
-    // indicate if the memory records is writable or not (i.e. used for appends or read-only)
-    private boolean writable;
+    private int validBytes = -1;
 
     // Construct a writable memory records
-    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
-        this.writable = writable;
-        this.writeLimit = writeLimit;
-        this.initialCapacity = buffer.capacity();
-        if (this.writable) {
-            this.buffer = null;
-            this.compressor = new Compressor(buffer, type);
-        } else {
-            this.buffer = buffer;
-            this.compressor = null;
-        }
-    }
-
-    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
-        return new MemoryRecords(buffer, type, true, writeLimit);
-    }
-
-    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
-        // use the buffer capacity as the default write limit
-        return emptyRecords(buffer, type, buffer.capacity());
+    private MemoryRecords(ByteBuffer buffer) {
+        this.buffer = buffer;
     }
 
-    public static MemoryRecords readableRecords(ByteBuffer buffer) {
-        return new MemoryRecords(buffer, CompressionType.NONE, false, WRITE_LIMIT_FOR_READABLE_ONLY);
+    @Override
+    public int sizeInBytes() {
+        return buffer.limit();
     }
 
-    /**
-     * Append the given record and offset to the buffer
-     */
-    public void append(long offset, Record record) {
-        if (!writable)
-            throw new IllegalStateException("Memory records is not writable");
-
-        int size = record.size();
-        compressor.putLong(offset);
-        compressor.putInt(size);
-        compressor.put(record.buffer());
-        compressor.recordWritten(size + Records.LOG_OVERHEAD);
-        record.buffer().rewind();
+    @Override
+    public long writeTo(GatheringByteChannel channel, long position, int length) throws IOException {
+        ByteBuffer dup = buffer.duplicate();
+        int pos = (int) position;
+        dup.position(pos);
+        dup.limit(pos + length);
+        return channel.write(dup);
     }
 
     /**
-     * Append a new record and offset to the buffer
-     * @return crc of the record
+     * Write all records to the given channel (including partial records).
+     * @param channel The channel to write to
+     * @return The number of bytes written
+     * @throws IOException For any IO errors writing to the channel
      */
-    public long append(long offset, long timestamp, byte[] key, byte[] value) {
-        if (!writable)
-            throw new IllegalStateException("Memory records is not writable");
-
-        int size = Record.recordSize(key, value);
-        compressor.putLong(offset);
-        compressor.putInt(size);
-        long crc = compressor.putRecord(timestamp, key, value);
-        compressor.recordWritten(size + Records.LOG_OVERHEAD);
-        return crc;
+    public int writeFullyTo(GatheringByteChannel channel) throws IOException {
+        buffer.mark();
+        int written = 0;
+        while (written < sizeInBytes())
+            written += channel.write(buffer);
+        buffer.reset();
+        return written;
     }
 
     /**
-     * Check if we have room for a new record containing the given key/value pair
-     *
-     * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
-     * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
-     * re-allocation in the underlying byte buffer stream.
-     *
-     * There is an exceptional case when appending a single message whose size is larger than the batch size, the
-     * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
-     * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
-     * to accept this single record.
+     * The total number of bytes in this message set not including any partial, trailing messages. This
+     * may be smaller than what is returned by {@link #sizeInBytes()}.
+     * @return The number of valid bytes
      */
-    public boolean hasRoomFor(byte[] key, byte[] value) {
-        if (!this.writable)
-            return false;
+    public int validBytes() {
+        if (validBytes >= 0)
+            return validBytes;
 
-        return this.compressor.numRecordsWritten() == 0 ?
-            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
-            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    }
+        int bytes = 0;
+        Iterator<ByteBufferLogEntry> iterator = shallowIterator();
+        while (iterator.hasNext())
+            bytes += iterator.next().sizeInBytes();
 
-    public boolean isFull() {
-        return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();
+        this.validBytes = bytes;
+        return bytes;
     }
 
     /**
-     * Close this batch for no more appends
+     * Filter the records into the provided ByteBuffer.
+     * @param filter The filter function
+     * @param buffer The byte buffer to write the filtered records to
+     * @return A FilterResult with a summary of the output (for metrics)
      */
-    public void close() {
-        if (writable) {
-            // close the compressor to fill-in wrapper message metadata if necessary
-            compressor.close();
-
-            // flip the underlying buffer to be ready for reads
-            buffer = compressor.buffer();
-            buffer.flip();
-
-            // reset the writable flag
-            writable = false;
+    public FilterResult filterTo(LogEntryFilter filter, ByteBuffer buffer) {
+        long maxTimestamp = Record.NO_TIMESTAMP;
+        long shallowOffsetOfMaxTimestamp = -1L;
+        int messagesRead = 0;
+        int bytesRead = 0;
+        int messagesRetained = 0;
+        int bytesRetained = 0;
+
+        Iterator<ByteBufferLogEntry> shallowIterator = shallowIterator();
+        while (shallowIterator.hasNext()) {
+            ByteBufferLogEntry shallowEntry = shallowIterator.next();
+            bytesRead += shallowEntry.sizeInBytes();
+
+            // We use the absolute offset to decide whether to retain the message or not (this is handled by the
+            // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
+            // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
+            // of the inner messages. This will be fixed as we recopy the messages to the destination buffer.
+
+            Record shallowRecord = shallowEntry.record();
+            byte shallowMagic = shallowRecord.magic();
+            boolean writeOriginalEntry = true;
+            List<LogEntry> retainedEntries = new ArrayList<>();
+
+            for (LogEntry deepEntry : shallowEntry) {
+                Record deepRecord = deepEntry.record();
+                messagesRead += 1;
+
+                if (filter.shouldRetain(deepEntry)) {
+                    // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+                    // the corrupted entry with correct data.
+                    if (shallowMagic != deepRecord.magic())
+                        writeOriginalEntry = false;
+
+                    retainedEntries.add(deepEntry);
+                } else {
+                    writeOriginalEntry = false;
+                }
+            }
+
+            if (writeOriginalEntry) {
+                // There are no messages compacted out and no message format conversion, write the original message set back
+                shallowEntry.writeTo(buffer);
+                messagesRetained += retainedEntries.size();
+                bytesRetained += shallowEntry.sizeInBytes();
+
+                if (shallowRecord.timestamp() > maxTimestamp) {
+                    maxTimestamp = shallowRecord.timestamp();
+                    shallowOffsetOfMaxTimestamp = shallowEntry.offset();
+                }
+            } else if (!retainedEntries.isEmpty()) {
+                ByteBuffer slice = buffer.slice();
+                MemoryRecordsBuilder builder = builderWithEntries(slice, shallowRecord.timestampType(), shallowRecord.compressionType(),
+                        shallowRecord.timestamp(), retainedEntries);
+                MemoryRecords records = builder.build();
+                buffer.position(buffer.position() + slice.position());
+                messagesRetained += retainedEntries.size();
+                bytesRetained += records.sizeInBytes();
+
+                MemoryRecordsBuilder.RecordsInfo info = builder.info();
+                if (info.maxTimestamp > maxTimestamp) {
+                    maxTimestamp = info.maxTimestamp;
+                    shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
+                }
+            }
         }
+
+        return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxTimestamp, shallowOffsetOfMaxTimestamp);
     }
 
     /**
-     * The size of this record set
+     * Get the byte buffer that backs this instance for reading.
      */
-    @Override
-    public int sizeInBytes() {
-        if (writable) {
-            return compressor.buffer().position();
-        } else {
-            return buffer.limit();
-        }
+    public ByteBuffer buffer() {
+        return buffer.duplicate();
     }
 
     @Override
-    public long writeTo(GatheringByteChannel channel, long offset, int length) throws IOException {
-        ByteBuffer dup = buffer.duplicate();
-        int position = (int) offset;
-        dup.position(position);
-        dup.limit(position + length);
-        return channel.write(dup);
+    public Iterator<ByteBufferLogEntry> shallowIterator() {
+        return RecordsIterator.shallowIterator(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
     }
 
-    /**
-     * The compression rate of this record set
-     */
-    public double compressionRate() {
-        if (compressor == null)
-            return 1.0;
-        else
-            return compressor.compressionRate();
+    @Override
+    public Iterator<LogEntry> deepIterator() {
+        return deepIterator(false);
     }
 
-    /**
-     * Return the capacity of the initial buffer, for writable records
-     * it may be different from the current buffer's capacity
-     */
-    public int initialCapacity() {
-        return this.initialCapacity;
+    public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic) {
+        return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
     }
 
-    /**
-     * Get the byte buffer that backs this records instance for reading
-     */
-    public ByteBuffer buffer() {
-        if (writable)
-            throw new IllegalStateException("The memory records must not be writable any more before getting its underlying buffer");
-
-        return buffer.duplicate();
+    public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
+        return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
+                ensureMatchingMagic, maxMessageSize);
     }
 
     @Override
-    public Iterator<LogEntry> iterator() {
-        ByteBuffer input = this.buffer.duplicate();
-        if (writable)
-            // flip on a duplicate buffer for reading
-            input.flip();
-        return new RecordsIterator(new ByteBufferLogInputStream(input), false);
-    }
-    
-    @Override
     public String toString() {
-        Iterator<LogEntry> iter = iterator();
+        Iterator<LogEntry> iter = deepIterator();
         StringBuilder builder = new StringBuilder();
         builder.append('[');
         while (iter.hasNext()) {
@@ -214,16 +202,13 @@ public class MemoryRecords implements Records {
             builder.append("record=");
             builder.append(entry.record());
             builder.append(")");
+            if (iter.hasNext())
+                builder.append(", ");
         }
         builder.append(']');
         return builder.toString();
     }
 
-    /** Visible for testing */
-    public boolean isWritable() {
-        return writable;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -232,7 +217,6 @@ public class MemoryRecords implements Records {
         MemoryRecords that = (MemoryRecords) o;
 
         return buffer.equals(that.buffer);
-
     }
 
     @Override
@@ -240,28 +224,153 @@ public class MemoryRecords implements Records {
         return buffer.hashCode();
     }
 
-    private static class ByteBufferLogInputStream implements LogInputStream {
-        private final DataInputStream stream;
-        private final ByteBuffer buffer;
+    public interface LogEntryFilter {
+        boolean shouldRetain(LogEntry entry);
+    }
 
-        private ByteBufferLogInputStream(ByteBuffer buffer) {
-            this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
-            this.buffer = buffer;
+    public static class FilterResult {
+        public final int messagesRead;
+        public final int bytesRead;
+        public final int messagesRetained;
+        public final int bytesRetained;
+        public final long maxTimestamp;
+        public final long shallowOffsetOfMaxTimestamp;
+
+        public FilterResult(int messagesRead,
+                            int bytesRead,
+                            int messagesRetained,
+                            int bytesRetained,
+                            long maxTimestamp,
+                            long shallowOffsetOfMaxTimestamp) {
+            this.messagesRead = messagesRead;
+            this.bytesRead = bytesRead;
+            this.messagesRetained = messagesRetained;
+            this.bytesRetained = bytesRetained;
+            this.maxTimestamp = maxTimestamp;
+            this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
         }
+    }
 
-        public LogEntry nextEntry() throws IOException {
-            long offset = stream.readLong();
-            int size = stream.readInt();
-            if (size < 0)
-                throw new IllegalStateException("Record with size " + size);
-
-            ByteBuffer slice = buffer.slice();
-            int newPos = buffer.position() + size;
-            if (newPos > buffer.limit())
-                return null;
-            buffer.position(newPos);
-            slice.limit(size);
-            return new LogEntry(offset, new Record(slice));
-        }
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType,
+                                               int writeLimit) {
+        return new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L, System.currentTimeMillis(), writeLimit);
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               byte magic,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType,
+                                               long baseOffset,
+                                               long logAppendTime) {
+        return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, buffer.capacity());
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType) {
+        // use the buffer capacity as the default write limit
+        return builder(buffer, compressionType, timestampType, buffer.capacity());
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               byte magic,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType) {
+        return builder(buffer, magic, compressionType, timestampType, 0L);
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               byte magic,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType,
+                                               long baseOffset) {
+        return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis());
+    }
+
+    public static MemoryRecords readableRecords(ByteBuffer buffer) {
+        return new MemoryRecords(buffer);
+    }
+
+    public static MemoryRecords withLogEntries(CompressionType compressionType, List<LogEntry> entries) {
+        return withLogEntries(TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), entries);
+    }
+
+    public static MemoryRecords withLogEntries(LogEntry ... entries) {
+        return withLogEntries(CompressionType.NONE, Arrays.asList(entries));
+    }
+
+    public static MemoryRecords withRecords(CompressionType compressionType, long initialOffset, List<Record> records) {
+        return withRecords(initialOffset, TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), records);
     }
+
+    public static MemoryRecords withRecords(Record ... records) {
+        return withRecords(CompressionType.NONE, 0L, Arrays.asList(records));
+    }
+
+    public static MemoryRecords withRecords(long initialOffset, Record ... records) {
+        return withRecords(CompressionType.NONE, initialOffset, Arrays.asList(records));
+    }
+
+    public static MemoryRecords withRecords(CompressionType compressionType, Record ... records) {
+        return withRecords(compressionType, 0L, Arrays.asList(records));
+    }
+
+    public static MemoryRecords withRecords(TimestampType timestampType, CompressionType compressionType, Record ... records) {
+        return withRecords(0L, timestampType, compressionType, System.currentTimeMillis(), Arrays.asList(records));
+    }
+
+    public static MemoryRecords withRecords(long initialOffset,
+                                            TimestampType timestampType,
+                                            CompressionType compressionType,
+                                            long logAppendTime,
+                                            List<Record> records) {
+        return withLogEntries(timestampType, compressionType, logAppendTime, buildLogEntries(initialOffset, records));
+    }
+
+    private static MemoryRecords withLogEntries(TimestampType timestampType,
+                                                CompressionType compressionType,
+                                                long logAppendTime,
+                                                List<LogEntry> entries) {
+        if (entries.isEmpty())
+            return MemoryRecords.EMPTY;
+        return builderWithEntries(timestampType, compressionType, logAppendTime, entries).build();
+    }
+
+    private static List<LogEntry> buildLogEntries(long initialOffset, List<Record> records) {
+        List<LogEntry> entries = new ArrayList<>();
+        for (Record record : records)
+            entries.add(LogEntry.create(initialOffset++, record));
+        return entries;
+    }
+
+    public static MemoryRecordsBuilder builderWithEntries(TimestampType timestampType,
+                                                          CompressionType compressionType,
+                                                          long logAppendTime,
+                                                          List<LogEntry> entries) {
+        ByteBuffer buffer = ByteBuffer.allocate(estimatedSize(compressionType, entries));
+        return builderWithEntries(buffer, timestampType, compressionType, logAppendTime, entries);
+    }
+
+    private static MemoryRecordsBuilder builderWithEntries(ByteBuffer buffer,
+                                                           TimestampType timestampType,
+                                                           CompressionType compressionType,
+                                                           long logAppendTime,
+                                                           List<LogEntry> entries) {
+        if (entries.isEmpty())
+            throw new IllegalArgumentException();
+
+        LogEntry firstEntry = entries.iterator().next();
+        long firstOffset = firstEntry.offset();
+        byte magic = firstEntry.record().magic();
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType,
+                firstOffset, logAppendTime);
+        for (LogEntry entry : entries)
+            builder.append(entry);
+
+        return builder;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
new file mode 100644
index 0000000..b90a9e6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * This class is used to write new log data in memory, i.e. this is the write path for {@link MemoryRecords}.
+ * It transparently handles compression and exposes methods for appending new entries, possibly with message
+ * format conversion.
+ */
+public class MemoryRecordsBuilder {
+
+    static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
+    static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
+
+    private static final float[] TYPE_TO_RATE;
+
+    static {
+        int maxTypeId = -1;
+        for (CompressionType type : CompressionType.values())
+            maxTypeId = Math.max(maxTypeId, type.id);
+        TYPE_TO_RATE = new float[maxTypeId + 1];
+        for (CompressionType type : CompressionType.values()) {
+            TYPE_TO_RATE[type.id] = type.rate;
+        }
+    }
+
+    // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
+    // caching constructors to avoid invoking of Class.forName method for each batch
+    private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyOutputStream")
+                .getConstructor(OutputStream.class, Integer.TYPE);
+        }
+    });
+
+    private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
+                .getConstructor(OutputStream.class, Boolean.TYPE);
+        }
+    });
+
+    private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyInputStream")
+                .getConstructor(InputStream.class);
+        }
+    });
+
+    private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
+                .getConstructor(InputStream.class, Boolean.TYPE);
+        }
+    });
+
+    private final TimestampType timestampType;
+    private final CompressionType compressionType;
+    private final DataOutputStream appendStream;
+    private final ByteBufferOutputStream bufferStream;
+    private final byte magic;
+    private final int initPos;
+    private final long baseOffset;
+    private final long logAppendTime;
+    private final int writeLimit;
+    private final int initialCapacity;
+
+    private MemoryRecords builtRecords;
+    private long writtenUncompressed;
+    private long numRecords;
+    private float compressionRate;
+    private long maxTimestamp;
+    private long offsetOfMaxTimestamp;
+    private long lastOffset = -1;
+
+    public MemoryRecordsBuilder(ByteBuffer buffer,
+                                byte magic,
+                                CompressionType compressionType,
+                                TimestampType timestampType,
+                                long baseOffset,
+                                long logAppendTime,
+                                int writeLimit) {
+        this.magic = magic;
+        this.timestampType = timestampType;
+        this.compressionType = compressionType;
+        this.baseOffset = baseOffset;
+        this.logAppendTime = logAppendTime;
+        this.initPos = buffer.position();
+        this.numRecords = 0;
+        this.writtenUncompressed = 0;
+        this.compressionRate = 1;
+        this.maxTimestamp = Record.NO_TIMESTAMP;
+        this.writeLimit = writeLimit;
+        this.initialCapacity = buffer.capacity();
+
+        if (compressionType != CompressionType.NONE) {
+            // for compressed records, leave space for the header and the shallow message metadata
+            // and move the starting position to the value payload offset
+            buffer.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic));
+        }
+
+        // create the stream
+        bufferStream = new ByteBufferOutputStream(buffer);
+        appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE);
+    }
+
+    public ByteBuffer buffer() {
+        return bufferStream.buffer();
+    }
+
+    public int initialCapacity() {
+        return initialCapacity;
+    }
+
+    public double compressionRate() {
+        return compressionRate;
+    }
+
+    /**
+     * Close this builder and return the resulting buffer.
+     * @return The built log buffer
+     */
+    public MemoryRecords build() {
+        close();
+        return builtRecords;
+    }
+
+    /**
+     * Get the max timestamp and its offset. If the log append time is used, then the offset will
+     * be either the first offset in the set if no compression is used or the last offset otherwise.
+     * @return The max timestamp and its offset
+     */
+    public RecordsInfo info() {
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+            return new RecordsInfo(logAppendTime,  lastOffset);
+        else
+            return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
+    }
+
+    public void close() {
+        if (builtRecords != null)
+            return;
+
+        try {
+            appendStream.close();
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        if (compressionType != CompressionType.NONE)
+            writerCompressedWrapperHeader();
+
+        ByteBuffer buffer = buffer().duplicate();
+        buffer.flip();
+        buffer.position(initPos);
+        builtRecords = MemoryRecords.readableRecords(buffer.slice());
+    }
+
+    private void writerCompressedWrapperHeader() {
+        ByteBuffer buffer = bufferStream.buffer();
+        int pos = buffer.position();
+        buffer.position(initPos);
+
+        int wrapperSize = pos - initPos - Records.LOG_OVERHEAD;
+        int writtenCompressed = wrapperSize - Record.recordOverhead(magic);
+        LogEntry.writeHeader(buffer, lastOffset, wrapperSize);
+
+        long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp;
+        Record.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
+
+        buffer.position(pos);
+
+        // update the compression ratio
+        this.compressionRate = (float) writtenCompressed / this.writtenUncompressed;
+        TYPE_TO_RATE[compressionType.id] = TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+            compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
+    }
+
+    /**
+     * Append a new record and offset to the buffer
+     * @param offset The absolute offset of the record in the log buffer
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @return crc of the record
+     */
+    public long append(long offset, long timestamp, byte[] key, byte[] value) {
+        try {
+            if (lastOffset > 0 && offset <= lastOffset)
+                throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+
+            int size = Record.recordSize(magic, key, value);
+            LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+
+            if (timestampType == TimestampType.LOG_APPEND_TIME)
+                timestamp = logAppendTime;
+            long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
+            recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
+            return crc;
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    /**
+     * Add the record, converting to the desired magic value if necessary.
+     * @param offset The offset of the record
+     * @param record The record to add
+     */
+    public void convertAndAppend(long offset, Record record) {
+        if (magic == record.magic()) {
+            append(offset, record);
+            return;
+        }
+
+        if (lastOffset > 0 && offset <= lastOffset)
+            throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+
+        try {
+            int size = record.convertedSize(magic);
+            LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+            long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : record.timestamp();
+            record.convertTo(appendStream, magic, timestamp, timestampType);
+            recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    /**
+     * Add a record without doing offset/magic validation (this should only be used in testing).
+     * @param offset The offset of the record
+     * @param record The record to add
+     */
+    public void appendUnchecked(long offset, Record record) {
+        try {
+            int size = record.sizeInBytes();
+            LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+
+            ByteBuffer buffer = record.buffer().duplicate();
+            appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
+
+            recordWritten(offset, record.timestamp(), size + Records.LOG_OVERHEAD);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
+    /**
+     * Append the given log entry. The entry's record must have a magic which matches the magic use to
+     * construct this builder and the offset must be greater than the last appended entry.
+     * @param entry The entry to append
+     */
+    public void append(LogEntry entry) {
+        append(entry.offset(), entry.record());
+    }
+
+    /**
+     * Add a record with a given offset. The record must have a magic which matches the magic use to
+     * construct this builder and the offset must be greater than the last appended entry.
+     * @param offset The offset of the record
+     * @param record The record to add
+     */
+    public void append(long offset, Record record) {
+        if (record.magic() != magic)
+            throw new IllegalArgumentException("Inner log entries must have matching magic values as the wrapper");
+        if (lastOffset > 0 && offset <= lastOffset)
+            throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+        appendUnchecked(offset, record);
+    }
+
+    private long toInnerOffset(long offset) {
+        // use relative offsets for compressed messages with magic v1
+        if (magic > 0 && compressionType != CompressionType.NONE)
+            return offset - baseOffset;
+        return offset;
+    }
+
+    private void recordWritten(long offset, long timestamp, int size) {
+        numRecords += 1;
+        writtenUncompressed += size;
+        lastOffset = offset;
+
+        if (timestamp > maxTimestamp) {
+            maxTimestamp = timestamp;
+            offsetOfMaxTimestamp = offset;
+        }
+    }
+
+    /**
+     * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}.
+     * @return The estimated number of bytes written
+     */
+    private int estimatedBytesWritten() {
+        if (compressionType == CompressionType.NONE) {
+            return buffer().position();
+        } else {
+            // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
+            return (int) (writtenUncompressed * TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+        }
+    }
+
+    /**
+     * Check if we have room for a new record containing the given key/value pair
+     *
+     * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
+     * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
+     * re-allocation in the underlying byte buffer stream.
+     *
+     * There is an exceptional case when appending a single message whose size is larger than the batch size, the
+     * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case
+     * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
+     * to accept this single record.
+     */
+    public boolean hasRoomFor(byte[] key, byte[] value) {
+        return !isFull() && (numRecords == 0 ?
+                this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(magic, key, value) :
+                this.writeLimit >= estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(magic, key, value));
+    }
+
+    public boolean isClosed() {
+        return builtRecords != null;
+    }
+
+    public boolean isFull() {
+        return isClosed() || this.writeLimit <= estimatedBytesWritten();
+    }
+
+    public int sizeInBytes() {
+        return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten();
+    }
+
+    private static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) {
+        try {
+            switch (type) {
+                case NONE:
+                    return buffer;
+                case GZIP:
+                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+                case SNAPPY:
+                    try {
+                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
+                        return new DataOutputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                case LZ4:
+                    try {
+                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer,
+                                messageVersion == Record.MAGIC_VALUE_V0);
+                        return new DataOutputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                default:
+                    throw new IllegalArgumentException("Unknown compression type: " + type);
+            }
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
+        try {
+            switch (type) {
+                case NONE:
+                    return buffer;
+                case GZIP:
+                    return new DataInputStream(new GZIPInputStream(buffer));
+                case SNAPPY:
+                    try {
+                        InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
+                        return new DataInputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                case LZ4:
+                    try {
+                        InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
+                                messageVersion == Record.MAGIC_VALUE_V0);
+                        return new DataInputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                default:
+                    throw new IllegalArgumentException("Unknown compression type: " + type);
+            }
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private interface ConstructorSupplier {
+        Constructor get() throws ClassNotFoundException, NoSuchMethodException;
+    }
+
+    // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
+    private static class MemoizingConstructorSupplier {
+        final ConstructorSupplier delegate;
+        transient volatile boolean initialized;
+        transient Constructor value;
+
+        public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
+            this.delegate = delegate;
+        }
+
+        public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
+            if (!initialized) {
+                synchronized (this) {
+                    if (!initialized) {
+                        value = delegate.get();
+                        initialized = true;
+                    }
+                }
+            }
+            return value;
+        }
+    }
+
+    public static class RecordsInfo {
+        public final long maxTimestamp;
+        public final long shallowOffsetOfMaxTimestamp;
+
+        public RecordsInfo(long maxTimestamp,
+                           long shallowOffsetOfMaxTimestamp) {
+            this.maxTimestamp = maxTimestamp;
+            this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 09cb80d..0c0fa3c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -16,11 +16,15 @@
  */
 package org.apache.kafka.common.record;
 
-import java.nio.ByteBuffer;
-
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Crc32;
 import org.apache.kafka.common.utils.Utils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.utils.Utils.wrapNullable;
 
 /**
  * A record: a serialized key and value along with the associated CRC and other fields
@@ -53,7 +57,12 @@ public final class Record {
     /**
      * The amount of overhead bytes in a record
      */
-    public static final int RECORD_OVERHEAD = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+    public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+    /**
+     * The amount of overhead bytes in a record
+     */
+    public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
 
     /**
      * The "magic" values
@@ -80,11 +89,6 @@ public final class Record {
     public static final int TIMESTAMP_TYPE_ATTRIBUTE_OFFSET = 3;
 
     /**
-     * Compression code for uncompressed records
-     */
-    public static final int NO_COMPRESSION = 0;
-
-    /**
      * Timestamp value for records without a timestamp
      */
     public static final long NO_TIMESTAMP = -1L;
@@ -94,155 +98,20 @@ public final class Record {
     private final TimestampType wrapperRecordTimestampType;
 
     public Record(ByteBuffer buffer) {
-        this.buffer = buffer;
-        this.wrapperRecordTimestamp = null;
-        this.wrapperRecordTimestampType = null;
+        this(buffer, null, null);
     }
 
-    // Package private constructor for inner iteration.
-    Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) {
+    public Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) {
         this.buffer = buffer;
         this.wrapperRecordTimestamp = wrapperRecordTimestamp;
         this.wrapperRecordTimestampType = wrapperRecordTimestampType;
     }
 
     /**
-     * A constructor to create a LogRecord. If the record's compression type is not none, then
-     * its value payload should be already compressed with the specified type; the constructor
-     * would always write the value payload as is and will not do the compression itself.
-     *
-     * @param timestamp The timestamp of the record
-     * @param key The key of the record (null, if none)
-     * @param value The record value
-     * @param type The compression type used on the contents of the record (if any)
-     * @param valueOffset The offset into the payload array used to extract payload
-     * @param valueSize The size of the payload to use
-     */
-    public Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
-        this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
-            value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
-        write(this.buffer, timestamp, key, value, type, valueOffset, valueSize);
-        this.buffer.rewind();
-    }
-
-    public Record(long timestamp, byte[] key, byte[] value, CompressionType type) {
-        this(timestamp, key, value, type, 0, -1);
-    }
-
-    public Record(long timestamp, byte[] value, CompressionType type) {
-        this(timestamp, null, value, type);
-    }
-
-    public Record(long timestamp, byte[] key, byte[] value) {
-        this(timestamp, key, value, CompressionType.NONE);
-    }
-
-    public Record(long timestamp, byte[] value) {
-        this(timestamp, null, value, CompressionType.NONE);
-    }
-
-    // Write a record to the buffer, if the record's compression type is none, then
-    // its value payload should be already compressed with the specified type
-    public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
-        // construct the compressor with compression type none since this function will not do any
-        //compression according to the input type, it will just write the record's payload as is
-        Compressor compressor = new Compressor(buffer, CompressionType.NONE);
-        try {
-            compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize);
-        } finally {
-            compressor.close();
-        }
-    }
-
-    public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) {
-        // write crc
-        compressor.putInt((int) (crc & 0xffffffffL));
-        // write magic value
-        compressor.putByte(CURRENT_MAGIC_VALUE);
-        // write attributes
-        compressor.putByte(attributes);
-        // write timestamp
-        compressor.putLong(timestamp);
-        // write the key
-        if (key == null) {
-            compressor.putInt(-1);
-        } else {
-            compressor.putInt(key.length);
-            compressor.put(key, 0, key.length);
-        }
-        // write the value
-        if (value == null) {
-            compressor.putInt(-1);
-        } else {
-            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
-            compressor.putInt(size);
-            compressor.put(value, valueOffset, size);
-        }
-    }
-
-    public static int recordSize(byte[] key, byte[] value) {
-        return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
-    }
-
-    public static int recordSize(int keySize, int valueSize) {
-        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
-    }
-
-    public ByteBuffer buffer() {
-        return this.buffer;
-    }
-
-    public static byte computeAttributes(CompressionType type) {
-        byte attributes = 0;
-        if (type.id > 0)
-            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
-        return attributes;
-    }
-
-    /**
-     * Compute the checksum of the record from the record contents
-     */
-    public static long computeChecksum(ByteBuffer buffer, int position, int size) {
-        Crc32 crc = new Crc32();
-        crc.update(buffer.array(), buffer.arrayOffset() + position, size);
-        return crc.getValue();
-    }
-
-    /**
-     * Compute the checksum of the record from the attributes, key and value payloads
-     */
-    public static long computeChecksum(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
-        Crc32 crc = new Crc32();
-        crc.update(CURRENT_MAGIC_VALUE);
-        byte attributes = 0;
-        if (type.id > 0)
-            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
-        crc.update(attributes);
-        crc.updateLong(timestamp);
-        // update for the key
-        if (key == null) {
-            crc.updateInt(-1);
-        } else {
-            crc.updateInt(key.length);
-            crc.update(key, 0, key.length);
-        }
-        // update for the value
-        if (value == null) {
-            crc.updateInt(-1);
-        } else {
-            int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
-            crc.updateInt(size);
-            crc.update(value, valueOffset, size);
-        }
-        return crc.getValue();
-    }
-
-
-    /**
      * Compute the checksum of the record from the record contents
      */
     public long computeChecksum() {
-        return computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
+        return Utils.computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
     }
 
     /**
@@ -256,7 +125,15 @@ public final class Record {
      * Returns true if the crc stored with the record matches the crc computed off the record contents
      */
     public boolean isValid() {
-        return size() >= CRC_LENGTH && checksum() == computeChecksum();
+        return sizeInBytes() >= CRC_LENGTH && checksum() == computeChecksum();
+    }
+
+    public Long wrapperRecordTimestamp() {
+        return wrapperRecordTimestamp;
+    }
+
+    public TimestampType wrapperRecordTimestampType() {
+        return wrapperRecordTimestampType;
     }
 
     /**
@@ -264,9 +141,9 @@ public final class Record {
      */
     public void ensureValid() {
         if (!isValid()) {
-            if (size() < CRC_LENGTH)
+            if (sizeInBytes() < CRC_LENGTH)
                 throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too "
-                        + "small, size = " + size() + ")");
+                        + "small, size = " + sizeInBytes() + ")");
             else
                 throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
                         + ", computed crc = " + computeChecksum() + ")");
@@ -274,14 +151,17 @@ public final class Record {
     }
 
     /**
-     * The complete serialized size of this record in bytes (including crc, header attributes, etc)
+     * The complete serialized size of this record in bytes (including crc, header attributes, etc), but
+     * excluding the log overhead (offset and record size).
+     * @return the size in bytes
      */
-    public int size() {
+    public int sizeInBytes() {
         return buffer.limit();
     }
 
     /**
      * The length of the key in bytes
+     * @return the size in bytes of the key (0 if the key is null)
      */
     public int keySize() {
         if (magic() == MAGIC_VALUE_V0)
@@ -292,6 +172,7 @@ public final class Record {
 
     /**
      * Does the record have a key?
+     * @return true if so, false otherwise
      */
     public boolean hasKey() {
         return keySize() >= 0;
@@ -309,13 +190,23 @@ public final class Record {
 
     /**
      * The length of the value in bytes
+     * @return the size in bytes of the value (0 if the value is null)
      */
     public int valueSize() {
         return buffer.getInt(valueSizeOffset());
     }
 
     /**
-     * The magic version of this record
+     * Check whether the value field of this record is null.
+     * @return true if the value is null, false otherwise
+     */
+    public boolean hasNullValue() {
+        return valueSize() < 0;
+    }
+
+    /**
+     * The magic value (i.e. message format version) of this record
+     * @return the magic value
      */
     public byte magic() {
         return buffer.get(MAGIC_OFFSET);
@@ -323,6 +214,7 @@ public final class Record {
 
     /**
      * The attributes stored with this record
+     * @return the attributes
      */
     public byte attributes() {
         return buffer.get(ATTRIBUTES_OFFSET);
@@ -333,6 +225,8 @@ public final class Record {
      * 1. wrapperRecordTimestampType = null and wrapperRecordTimestamp is null - Uncompressed message, timestamp is in the message.
      * 2. wrapperRecordTimestampType = LOG_APPEND_TIME and WrapperRecordTimestamp is not null - Compressed message using LOG_APPEND_TIME
      * 3. wrapperRecordTimestampType = CREATE_TIME and wrapperRecordTimestamp is not null - Compressed message using CREATE_TIME
+     *
+     * @return the timestamp as determined above
      */
     public long timestamp() {
         if (magic() == MAGIC_VALUE_V0)
@@ -349,6 +243,8 @@ public final class Record {
 
     /**
      * The timestamp of the message.
+     * @return the timstamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic is 0 or the message has
+     *   been up-converted.
      */
     public TimestampType timestampType() {
         if (magic() == 0)
@@ -366,36 +262,30 @@ public final class Record {
 
     /**
      * A ByteBuffer containing the value of this record
+     * @return the value or null if the value for this record is null
      */
     public ByteBuffer value() {
-        return sliceDelimited(valueSizeOffset());
+        return Utils.sizeDelimited(buffer, valueSizeOffset());
     }
 
     /**
      * A ByteBuffer containing the message key
+     * @return the buffer or null if the key for this record is null
      */
     public ByteBuffer key() {
         if (magic() == MAGIC_VALUE_V0)
-            return sliceDelimited(KEY_SIZE_OFFSET_V0);
+            return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V0);
         else
-            return sliceDelimited(KEY_SIZE_OFFSET_V1);
+            return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V1);
     }
 
     /**
-     * Read a size-delimited byte buffer starting at the given offset
+     * Get the underlying buffer backing this record instance.
+     *
+     * @return the buffer
      */
-    private ByteBuffer sliceDelimited(int start) {
-        int size = buffer.getInt(start);
-        if (size < 0) {
-            return null;
-        } else {
-            ByteBuffer b = buffer.duplicate();
-            b.position(start + 4);
-            b = b.slice();
-            b.limit(size);
-            b.rewind();
-            return b;
-        }
+    public ByteBuffer buffer() {
+        return this.buffer;
     }
 
     public String toString() {
@@ -434,4 +324,316 @@ public final class Record {
         return buffer.hashCode();
     }
 
+    /**
+     * Get the size of this record if converted to the given format.
+     *
+     * @param toMagic The target magic version to convert to
+     * @return The size in bytes after conversion
+     */
+    public int convertedSize(byte toMagic) {
+        return recordSize(toMagic, Math.max(0, keySize()), Math.max(0, valueSize()));
+    }
+
+    /**
+     * Convert this record to another message format.
+     *
+     * @param toMagic The target magic version to convert to
+     * @return A new record instance with a freshly allocated ByteBuffer.
+     */
+    public Record convert(byte toMagic) {
+        if (toMagic == magic())
+            return this;
+
+        ByteBuffer buffer = ByteBuffer.allocate(convertedSize(toMagic));
+        TimestampType timestampType = wrapperRecordTimestampType != null ?
+                wrapperRecordTimestampType : TimestampType.forAttributes(attributes());
+        convertTo(buffer, toMagic, timestamp(), timestampType);
+        buffer.rewind();
+        return new Record(buffer);
+    }
+
+    private void convertTo(ByteBuffer buffer, byte toMagic, long timestamp, TimestampType timestampType) {
+        if (compressionType() != CompressionType.NONE)
+            throw new IllegalArgumentException("Cannot use convertTo for deep conversion");
+
+        write(buffer, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType);
+    }
+
+    /**
+     * Convert this record to another message format and write the converted data to the provided outputs stream.
+     *
+     * @param out The output stream to write the converted data to
+     * @param toMagic The target magic version for conversion
+     * @param timestamp The timestamp to use in the converted record (for up-conversion)
+     * @param timestampType The timestamp type to use in the converted record (for up-conversion)
+     * @throws IOException for any IO errors writing the converted record.
+     */
+    public void convertTo(DataOutputStream out, byte toMagic, long timestamp, TimestampType timestampType) throws IOException {
+        if (compressionType() != CompressionType.NONE)
+            throw new IllegalArgumentException("Cannot use convertTo for deep conversion");
+
+        write(out, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType);
+    }
+
+    /**
+     * Create a new record instance. If the record's compression type is not none, then
+     * its value payload should be already compressed with the specified type; the constructor
+     * would always write the value payload as is and will not do the compression itself.
+     *
+     * @param magic The magic value to use
+     * @param timestamp The timestamp of the record
+     * @param key The key of the record (null, if none)
+     * @param value The record value
+     * @param compressionType The compression type used on the contents of the record (if any)
+     * @param timestampType The timestamp type to be used for this record
+     */
+    public static Record create(byte magic,
+                                long timestamp,
+                                byte[] key,
+                                byte[] value,
+                                CompressionType compressionType,
+                                TimestampType timestampType) {
+        int keySize = key == null ? 0 : key.length;
+        int valueSize = value == null ? 0 : value.length;
+        ByteBuffer buffer = ByteBuffer.allocate(recordSize(magic, keySize, valueSize));
+        write(buffer, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
+        buffer.rewind();
+        return new Record(buffer);
+    }
+
+    public static Record create(long timestamp, byte[] key, byte[] value) {
+        return create(CURRENT_MAGIC_VALUE, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+    }
+
+    public static Record create(byte magic, long timestamp, byte[] key, byte[] value) {
+        return create(magic, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+    }
+
+    public static Record create(byte magic, TimestampType timestampType, long timestamp, byte[] key, byte[] value) {
+        return create(magic, timestamp, key, value, CompressionType.NONE, timestampType);
+    }
+
+    public static Record create(byte magic, long timestamp, byte[] value) {
+        return create(magic, timestamp, null, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+    }
+
+    public static Record create(byte magic, byte[] key, byte[] value) {
+        return create(magic, NO_TIMESTAMP, key, value);
+    }
+
+    public static Record create(byte[] key, byte[] value) {
+        return create(NO_TIMESTAMP, key, value);
+    }
+
+    public static Record create(byte[] value) {
+        return create(CURRENT_MAGIC_VALUE, NO_TIMESTAMP, null, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+    }
+
+    /**
+     * Write the header for a compressed record set in-place (i.e. assuming the compressed record data has already
+     * been written at the value offset in a wrapped record). This lets you dynamically create a compressed message
+     * set, and then go back later and fill in its size and CRC, which saves the need for copying to another buffer.
+     *
+     * @param buffer The buffer containing the compressed record data positioned at the first offset of the
+     * @param magic The magic value of the record set
+     * @param recordSize The size of the record (including record overhead)
+     * @param timestamp The timestamp of the wrapper record
+     * @param compressionType The compression type used
+     * @param timestampType The timestamp type of the wrapper record
+     */
+    public static void writeCompressedRecordHeader(ByteBuffer buffer,
+                                                   byte magic,
+                                                   int recordSize,
+                                                   long timestamp,
+                                                   CompressionType compressionType,
+                                                   TimestampType timestampType) {
+        int recordPosition = buffer.position();
+        int valueSize = recordSize - recordOverhead(magic);
+
+        // write the record header with a null value (the key is always null for the wrapper)
+        write(buffer, magic, timestamp, null, null, compressionType, timestampType);
+
+        // now fill in the value size
+        buffer.putInt(recordPosition + keyOffset(magic), valueSize);
+
+        // compute and fill the crc from the beginning of the message
+        long crc = Utils.computeChecksum(buffer, recordPosition + MAGIC_OFFSET, recordSize - MAGIC_OFFSET);
+        Utils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
+    }
+
+    private static void write(ByteBuffer buffer,
+                              byte magic,
+                              long timestamp,
+                              ByteBuffer key,
+                              ByteBuffer value,
+                              CompressionType compressionType,
+                              TimestampType timestampType) {
+        try {
+            ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+            write(out, magic, timestamp, key, value, compressionType, timestampType);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Write the record data with the given compression type and return the computed crc.
+     *
+     * @param out The output stream to write to
+     * @param magic The magic value to be used
+     * @param timestamp The timestamp of the record
+     * @param key The record key
+     * @param value The record value
+     * @param compressionType The compression type
+     * @param timestampType The timestamp type
+     * @return the computed CRC for this record.
+     * @throws IOException for any IO errors writing to the output stream.
+     */
+    public static long write(DataOutputStream out,
+                             byte magic,
+                             long timestamp,
+                             byte[] key,
+                             byte[] value,
+                             CompressionType compressionType,
+                             TimestampType timestampType) throws IOException {
+        return write(out, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
+    }
+
+    private static long write(DataOutputStream out,
+                              byte magic,
+                              long timestamp,
+                              ByteBuffer key,
+                              ByteBuffer value,
+                              CompressionType compressionType,
+                              TimestampType timestampType) throws IOException {
+        byte attributes = computeAttributes(magic, compressionType, timestampType);
+        long crc = computeChecksum(magic, attributes, timestamp, key, value);
+        write(out, magic, crc, attributes, timestamp, key, value);
+        return crc;
+    }
+
+
+    /**
+     * Write a record using raw fields (without validation). This should only be used in testing.
+     */
+    public static void write(DataOutputStream out,
+                             byte magic,
+                             long crc,
+                             byte attributes,
+                             long timestamp,
+                             byte[] key,
+                             byte[] value) throws IOException {
+        write(out, magic, crc, attributes, timestamp, wrapNullable(key), wrapNullable(value));
+    }
+
+    // Write a record to the buffer, if the record's compression type is none, then
+    // its value payload should be already compressed with the specified type
+    private static void write(DataOutputStream out,
+                              byte magic,
+                              long crc,
+                              byte attributes,
+                              long timestamp,
+                              ByteBuffer key,
+                              ByteBuffer value) throws IOException {
+        if (magic != MAGIC_VALUE_V0 && magic != MAGIC_VALUE_V1)
+            throw new IllegalArgumentException("Invalid magic value " + magic);
+        if (timestamp < 0 && timestamp != NO_TIMESTAMP)
+            throw new IllegalArgumentException("Invalid message timestamp " + timestamp);
+
+        // write crc
+        out.writeInt((int) (crc & 0xffffffffL));
+        // write magic value
+        out.writeByte(magic);
+        // write attributes
+        out.writeByte(attributes);
+
+        // maybe write timestamp
+        if (magic > 0)
+            out.writeLong(timestamp);
+
+        // write the key
+        if (key == null) {
+            out.writeInt(-1);
+        } else {
+            int size = key.remaining();
+            out.writeInt(size);
+            out.write(key.array(), key.arrayOffset(), size);
+        }
+        // write the value
+        if (value == null) {
+            out.writeInt(-1);
+        } else {
+            int size = value.remaining();
+            out.writeInt(size);
+            out.write(value.array(), value.arrayOffset(), size);
+        }
+    }
+
+    public static int recordSize(byte[] key, byte[] value) {
+        return recordSize(CURRENT_MAGIC_VALUE, key, value);
+    }
+
+    public static int recordSize(byte magic, byte[] key, byte[] value) {
+        return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length);
+    }
+
+    private static int recordSize(byte magic, int keySize, int valueSize) {
+        return recordOverhead(magic) + keySize + valueSize;
+    }
+
+    // visible only for testing
+    public static byte computeAttributes(byte magic, CompressionType type, TimestampType timestampType) {
+        byte attributes = 0;
+        if (type.id > 0)
+            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
+        if (magic > 0)
+            return timestampType.updateAttributes(attributes);
+        return attributes;
+    }
+
+    // visible only for testing
+    public static long computeChecksum(byte magic, byte attributes, long timestamp, byte[] key, byte[] value) {
+        return computeChecksum(magic, attributes, timestamp, wrapNullable(key), wrapNullable(value));
+    }
+
+    /**
+     * Compute the checksum of the record from the attributes, key and value payloads
+     */
+    private static long computeChecksum(byte magic, byte attributes, long timestamp, ByteBuffer key, ByteBuffer value) {
+        Crc32 crc = new Crc32();
+        crc.update(magic);
+        crc.update(attributes);
+        if (magic > 0)
+            crc.updateLong(timestamp);
+        // update for the key
+        if (key == null) {
+            crc.updateInt(-1);
+        } else {
+            int size = key.remaining();
+            crc.updateInt(size);
+            crc.update(key.array(), key.arrayOffset(), size);
+        }
+        // update for the value
+        if (value == null) {
+            crc.updateInt(-1);
+        } else {
+            int size = value.remaining();
+            crc.updateInt(size);
+            crc.update(value.array(), value.arrayOffset(), size);
+        }
+        return crc.getValue();
+    }
+
+    public static int recordOverhead(byte magic) {
+        if (magic == 0)
+            return RECORD_OVERHEAD_V0;
+        return RECORD_OVERHEAD_V1;
+    }
+
+    private static int keyOffset(byte magic) {
+        if (magic == 0)
+            return KEY_OFFSET_V0;
+        return KEY_OFFSET_V1;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 3bc043f..823d2b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -18,32 +18,74 @@ package org.apache.kafka.common.record;
 
 import java.io.IOException;
 import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
 
 /**
- * A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
- * for the in-memory representation.
+ * Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
+ * Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
+ * If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
+ * compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
+ * over the shallow records, use {@link #shallowIterator()}; for the deep records, use {@link #deepIterator()}. Note
+ * that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
+ * shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
+ * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
  */
-public interface Records extends Iterable<LogEntry> {
+public interface Records {
 
-    int SIZE_LENGTH = 4;
+    int OFFSET_OFFSET = 0;
     int OFFSET_LENGTH = 8;
-    int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
+    int SIZE_OFFSET = OFFSET_OFFSET + OFFSET_LENGTH;
+    int SIZE_LENGTH = 4;
+    int LOG_OVERHEAD = SIZE_OFFSET + SIZE_LENGTH;
 
     /**
-     * The size of these records in bytes
-     * @return The size in bytes
+     * The size of these records in bytes.
+     * @return The size in bytes of the records
      */
     int sizeInBytes();
 
     /**
-     * Write the messages in this set to the given channel starting at the given offset byte.
+     * Write the contents of this buffer to a channel.
      * @param channel The channel to write to
-     * @param position The position within this record set to begin writing from
+     * @param position The position in the buffer to write from
      * @param length The number of bytes to write
-     * @return The number of bytes written to the channel (which may be fewer than requested)
-     * @throws IOException For any IO errors copying the
+     * @return The number of bytes written
+     * @throws IOException For any IO errors
      */
     long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;
 
+    /**
+     * Get the shallow log entries in this log buffer. Note that the signature allows subclasses
+     * to return a more specific log entry type. This enables optimizations such as in-place offset
+     * assignment (see {@link ByteBufferLogInputStream.ByteBufferLogEntry}), and partial reading of
+     * record data (see {@link FileLogInputStream.FileChannelLogEntry#magic()}.
+     * @return An iterator over the shallow entries of the log
+     */
+    Iterator<? extends LogEntry> shallowIterator();
+
+    /**
+     * Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
+     * there are fewer options for optimization since the data must be decompressed before it can be
+     * returned. Hence there is little advantage in allowing subclasses to return a more specific type
+     * as we do for {@link #shallowIterator()}.
+     * @return An iterator over the deep entries of the log
+     */
+    Iterator<LogEntry> deepIterator();
+
+    /**
+     * Check whether all shallow entries in this buffer have a certain magic value.
+     * @param magic The magic value to check
+     * @return true if all shallow entries have a matching magic value, false otherwise
+     */
+    boolean hasMatchingShallowMagic(byte magic);
+
+
+    /**
+     * Convert all entries in this buffer to the format passed as a parameter. Note that this requires
+     * deep iteration since all of the deep records must also be converted to the desired format.
+     * @param toMagic The magic value to convert to
+     * @return A Records (which may or may not be the same instance)
+     */
+    Records toMessageFormat(byte toMagic);
 
 }