You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:36 UTC

[9/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives

KAFKA-4390; Replace MessageSet usage with client-side alternatives

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #2140 from hachikuji/KAFKA4390


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/67f1e5b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/67f1e5b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/67f1e5b9

Branch: refs/heads/trunk
Commit: 67f1e5b91bf073151ff57d5d656693e385726697
Parents: 6626b05
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Dec 13 10:26:25 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Dec 13 10:26:25 2016 -0800

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../clients/consumer/internals/Fetcher.java     |   7 +-
 .../clients/producer/internals/BufferPool.java  |   2 +-
 .../producer/internals/RecordAccumulator.java   |  30 +-
 .../clients/producer/internals/RecordBatch.java |  58 +-
 .../clients/producer/internals/Sender.java      |  10 +-
 .../kafka/common/record/AbstractRecords.java    |  92 +++
 .../common/record/ByteBufferInputStream.java    |  37 +-
 .../common/record/ByteBufferLogInputStream.java | 119 ++++
 .../common/record/ByteBufferOutputStream.java   |  54 +-
 .../kafka/common/record/CompressionType.java    |   2 +-
 .../apache/kafka/common/record/Compressor.java  | 332 ----------
 .../kafka/common/record/FileLogInputStream.java | 166 +++++
 .../apache/kafka/common/record/FileRecords.java | 465 ++++++++++++--
 .../common/record/InvalidRecordException.java   |   4 +-
 .../apache/kafka/common/record/LogEntry.java    | 151 ++++-
 .../kafka/common/record/LogInputStream.java     |  12 +-
 .../kafka/common/record/MemoryRecords.java      | 443 +++++++++-----
 .../common/record/MemoryRecordsBuilder.java     | 461 ++++++++++++++
 .../org/apache/kafka/common/record/Record.java  | 538 +++++++++++-----
 .../org/apache/kafka/common/record/Records.java |  64 +-
 .../kafka/common/record/RecordsIterator.java    | 179 ++++--
 .../kafka/common/record/TimestampType.java      |   1 +
 .../org/apache/kafka/common/utils/Utils.java    |  51 ++
 .../clients/consumer/KafkaConsumerTest.java     |   7 +-
 .../clients/consumer/internals/FetcherTest.java | 100 +--
 .../internals/RecordAccumulatorTest.java        |  51 +-
 .../record/ByteBufferLogInputStreamTest.java    | 110 ++++
 .../kafka/common/record/FileRecordsTest.java    | 410 +++++++++++++
 .../common/record/MemoryRecordsBuilderTest.java | 253 ++++++++
 .../kafka/common/record/MemoryRecordsTest.java  | 194 +++++-
 .../apache/kafka/common/record/RecordTest.java  |  46 +-
 .../kafka/common/record/SimpleRecordTest.java   |  52 +-
 .../kafka/common/record/TimestampTypeTest.java  |  37 ++
 .../java/org/apache/kafka/test/TestUtils.java   |  32 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  20 +-
 .../main/scala/kafka/cluster/Partition.scala    |  10 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |  10 +-
 .../kafka/coordinator/GroupCoordinator.scala    |   6 +-
 .../coordinator/GroupMetadataManager.scala      | 170 +++--
 .../main/scala/kafka/log/FileMessageSet.scala   | 445 --------------
 core/src/main/scala/kafka/log/Log.scala         | 128 ++--
 core/src/main/scala/kafka/log/LogCleaner.scala  |  55 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   2 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  98 +--
 .../src/main/scala/kafka/log/LogValidator.scala | 239 ++++++++
 core/src/main/scala/kafka/log/TimeIndex.scala   |   6 +-
 .../kafka/message/ByteBufferMessageSet.scala    | 613 +------------------
 core/src/main/scala/kafka/message/Message.scala |  64 +-
 .../scala/kafka/message/MessageAndOffset.scala  |  14 +-
 .../main/scala/kafka/message/MessageSet.scala   |   8 +-
 .../kafka/server/AbstractFetcherThread.scala    |  10 +-
 .../main/scala/kafka/server/DelayedFetch.scala  |   5 +-
 .../main/scala/kafka/server/FetchDataInfo.scala |   7 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  32 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  22 +-
 .../scala/kafka/server/ReplicaManager.scala     |  85 +--
 .../scala/kafka/tools/DumpLogSegments.scala     | 150 ++---
 .../kafka/tools/ReplicaVerificationTool.scala   |  52 +-
 .../api/GroupCoordinatorIntegrationTest.scala   |   6 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |  19 +-
 .../test/scala/other/kafka/StressTestLog.scala  |  22 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |  19 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |   2 +-
 .../GroupCoordinatorResponseTest.scala          |  30 +-
 .../coordinator/GroupMetadataManagerTest.scala  | 100 ++-
 .../unit/kafka/log/BrokerCompressionTest.scala  |  15 +-
 .../unit/kafka/log/FileMessageSetTest.scala     | 354 -----------
 .../kafka/log/LogCleanerIntegrationTest.scala   |  66 +-
 .../log/LogCleanerLagIntegrationTest.scala      |  24 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  29 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 275 ++++-----
 .../scala/unit/kafka/log/LogManagerTest.scala   |  22 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 136 ++--
 .../src/test/scala/unit/kafka/log/LogTest.scala | 204 +++---
 .../scala/unit/kafka/log/LogValidatorTest.scala | 395 ++++++++++++
 .../kafka/message/BaseMessageSetTestCases.scala |  20 +-
 .../message/ByteBufferMessageSetTest.scala      | 348 -----------
 .../kafka/message/MessageCompressionTest.scala  |   2 +-
 .../scala/unit/kafka/message/MessageTest.scala  |  38 +-
 .../server/AbstractFetcherThreadTest.scala      |  32 +-
 .../unit/kafka/server/FetchRequestTest.scala    |  53 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |  27 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |  14 +-
 .../unit/kafka/server/ProduceRequestTest.scala  |   8 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala |  33 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  28 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  21 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  19 +-
 89 files changed, 5269 insertions(+), 3914 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8eebdb5..62cd77a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -89,6 +89,7 @@
       <allow pkg="net.jpountz" />
       <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.network" />
+      <allow pkg="org.apache.kafka.common.errors" />
     </subpackage>
 
     <subpackage name="requests">

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6fb4229..3b9d49c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,6 +61,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -686,11 +687,13 @@ public class Fetcher<K, V> {
                 }
 
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
-                for (LogEntry logEntry : partition.records) {
+                Iterator<LogEntry> deepIterator = partition.records.deepIterator();
+                while (deepIterator.hasNext()) {
+                    LogEntry logEntry = deepIterator.next();
                     // Skip the messages earlier than current position.
                     if (logEntry.offset() >= position) {
                         parsed.add(parseRecord(tp, logEntry));
-                        bytes += logEntry.size();
+                        bytes += logEntry.sizeInBytes();
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index b42b0ec..077215c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -198,7 +198,7 @@ public final class BufferPool {
      * memory as free.
      * 
      * @param buffer The buffer to return
-     * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
+     * @param size The size of the buffer to mark as deallocated, note that this may be smaller than buffer.capacity
      *             since the buffer may re-allocate itself during in-place compression
      */
     public void deallocate(ByteBuffer buffer, int size) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index fa1e513..06d39ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -27,8 +27,10 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -49,7 +51,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
+ * This class acts as a queue that accumulates records into {@link MemoryRecords}
  * instances to be sent to the server.
  * <p>
  * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
@@ -77,7 +79,7 @@ public final class RecordAccumulator {
     /**
      * Create a new record accumulator
      * 
-     * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
+     * @param batchSize The size to use when allocating {@link MemoryRecords} instances
      * @param totalSize The maximum memory the record accumulator can use.
      * @param compression The compression codec for the records
      * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
@@ -190,13 +192,13 @@ public final class RecordAccumulator {
                     free.deallocate(buffer);
                     return appendResult;
                 }
-                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
-                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
+                MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
+                RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
 
                 dq.addLast(batch);
                 incomplete.add(batch);
-                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
+                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
             }
         } finally {
             appendsInProgress.decrementAndGet();
@@ -212,9 +214,9 @@ public final class RecordAccumulator {
         if (last != null) {
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
             if (future == null)
-                last.records.close();
+                last.close();
             else
-                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
+                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
         }
         return null;
     }
@@ -240,7 +242,7 @@ public final class RecordAccumulator {
                     Iterator<RecordBatch> batchIterator = dq.iterator();
                     while (batchIterator.hasNext()) {
                         RecordBatch batch = batchIterator.next();
-                        boolean isFull = batch != lastBatch || batch.records.isFull();
+                        boolean isFull = batch != lastBatch || batch.isFull();
                         // check if the batch is expired
                         if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
                             expiredBatches.add(batch);
@@ -319,7 +321,7 @@ public final class RecordAccumulator {
                         long waitedTimeMs = nowMs - batch.lastAttemptMs;
                         long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                         long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
-                        boolean full = deque.size() > 1 || batch.records.isFull();
+                        boolean full = deque.size() > 1 || batch.isFull();
                         boolean expired = waitedTimeMs >= timeToWaitMs;
                         boolean sendable = full || expired || exhausted || closed || flushInProgress();
                         if (sendable && !backingOff) {
@@ -389,15 +391,15 @@ public final class RecordAccumulator {
                                 boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                                 // Only drain the batch if it is not during backoff period.
                                 if (!backoff) {
-                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
+                                    if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                         // there is a rare case that a single batch size is larger than the request size due
                                         // to compression; in this case we will still eventually send this batch in a single
                                         // request
                                         break;
                                     } else {
                                         RecordBatch batch = deque.pollFirst();
-                                        batch.records.close();
-                                        size += batch.records.sizeInBytes();
+                                        batch.close();
+                                        size += batch.sizeInBytes();
                                         ready.add(batch);
                                         batch.drainedMs = now;
                                     }
@@ -437,7 +439,7 @@ public final class RecordAccumulator {
      */
     public void deallocate(RecordBatch batch) {
         incomplete.remove(batch);
-        free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
+        free.deallocate(batch.buffer(), batch.initialCapacity());
     }
     
     /**
@@ -507,7 +509,7 @@ public final class RecordAccumulator {
             Deque<RecordBatch> dq = getDeque(batch.topicPartition);
             // Close the batch before aborting
             synchronized (dq) {
-                batch.records.close();
+                batch.close();
                 dq.remove(batch);
             }
             batch.done(-1L, Record.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 6706bfd..e9ef441 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -12,18 +12,20 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * A batch of records that is or will be sent.
  * 
@@ -39,21 +41,21 @@ public final class RecordBatch {
     public final long createdMs;
     public long drainedMs;
     public long lastAttemptMs;
-    public final MemoryRecords records;
     public final TopicPartition topicPartition;
     public final ProduceRequestResult produceFuture;
     public long lastAppendTime;
     private final List<Thunk> thunks;
     private long offsetCounter = 0L;
     private boolean retry;
+    private final MemoryRecordsBuilder recordsBuilder;
 
-    public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
+    public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
         this.createdMs = now;
         this.lastAttemptMs = now;
-        this.records = records;
+        this.recordsBuilder = recordsBuilder;
         this.topicPartition = tp;
         this.produceFuture = new ProduceRequestResult();
-        this.thunks = new ArrayList<Thunk>();
+        this.thunks = new ArrayList<>();
         this.lastAppendTime = createdMs;
         this.retry = false;
     }
@@ -64,10 +66,10 @@ public final class RecordBatch {
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
-        if (!this.records.hasRoomFor(key, value)) {
+        if (!recordsBuilder.hasRoomFor(key, value)) {
             return null;
         } else {
-            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
+            long checksum = this.recordsBuilder.append(offsetCounter++, timestamp, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
             this.lastAppendTime = now;
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
@@ -94,9 +96,8 @@ public final class RecordBatch {
                   baseOffset,
                   exception);
         // execute callbacks
-        for (int i = 0; i < this.thunks.size(); i++) {
+        for (Thunk thunk : thunks) {
             try {
-                Thunk thunk = this.thunks.get(i);
                 if (exception == null) {
                     // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
                     RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
@@ -156,7 +157,7 @@ public final class RecordBatch {
         }
 
         if (expire) {
-            this.records.close();
+            close();
             this.done(-1L, Record.NO_TIMESTAMP,
                       new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage));
         }
@@ -177,4 +178,37 @@ public final class RecordBatch {
     public void setRetry() {
         this.retry = true;
     }
+
+    public MemoryRecords records() {
+        return recordsBuilder.build();
+    }
+
+    public int sizeInBytes() {
+        return recordsBuilder.sizeInBytes();
+    }
+
+    public double compressionRate() {
+        return recordsBuilder.compressionRate();
+    }
+
+    public boolean isFull() {
+        return recordsBuilder.isFull();
+    }
+
+    public void close() {
+        recordsBuilder.close();
+    }
+
+    public ByteBuffer buffer() {
+        return recordsBuilder.buffer();
+    }
+
+    public int initialCapacity() {
+        return recordsBuilder.initialCapacity();
+    }
+
+    public boolean isWritable() {
+        return !recordsBuilder.isClosed();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7555b71..1f54c0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -345,7 +345,7 @@ public class Sender implements Runnable {
         final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
         for (RecordBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
-            produceRecordsByPartition.put(tp, batch.records);
+            produceRecordsByPartition.put(tp, batch.records());
             recordsByPartition.put(tp, batch);
         }
 
@@ -505,17 +505,17 @@ public class Sender implements Runnable {
                     // per-topic bytes send rate
                     String topicByteRateName = "topic." + topic + ".bytes";
                     Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
-                    topicByteRate.record(batch.records.sizeInBytes());
+                    topicByteRate.record(batch.sizeInBytes());
 
                     // per-topic compression rate
                     String topicCompressionRateName = "topic." + topic + ".compression-rate";
                     Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
-                    topicCompressionRate.record(batch.records.compressionRate());
+                    topicCompressionRate.record(batch.compressionRate());
 
                     // global metrics
-                    this.batchSizeSensor.record(batch.records.sizeInBytes(), now);
+                    this.batchSizeSensor.record(batch.sizeInBytes(), now);
                     this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
-                    this.compressionRateSensor.record(batch.records.compressionRate());
+                    this.compressionRateSensor.record(batch.compressionRate());
                     this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
                     records += batch.recordCount;
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
new file mode 100644
index 0000000..3794dc6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class AbstractRecords implements Records {
+
+    @Override
+    public boolean hasMatchingShallowMagic(byte magic) {
+        Iterator<? extends LogEntry> iterator = shallowIterator();
+        while (iterator.hasNext())
+            if (iterator.next().magic() != magic)
+                return false;
+        return true;
+    }
+
+    /**
+     * Convert this message set to use the specified message format.
+     */
+    @Override
+    public Records toMessageFormat(byte toMagic) {
+        List<LogEntry> converted = new ArrayList<>();
+        Iterator<LogEntry> deepIterator = deepIterator();
+        while (deepIterator.hasNext()) {
+            LogEntry entry = deepIterator.next();
+            converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
+        }
+
+        if (converted.isEmpty()) {
+            // This indicates that the message is too large, which indicates that the buffer is not large
+            // enough to hold a full log entry. We just return all the bytes in the file message set.
+            // Even though the message set does not have the right format version, we expect old clients
+            // to raise an error to the user after reading the message size and seeing that there
+            // are not enough available bytes in the response to read the full message.
+            return this;
+        } else {
+            // We use the first message to determine the compression type for the resulting message set.
+            // This could result in message sets which are either larger or smaller than the original size.
+            // For example, it could end up larger if most messages were previously compressed, but
+            // it just so happens that the first one is not. There is also some risk that this can
+            // cause some timestamp information to be lost (e.g. if the timestamp type was changed) since
+            // we are essentially merging multiple message sets. However, currently this method is only
+            // used for down-conversion, so we've ignored the problem.
+            CompressionType compressionType = shallowIterator().next().record().compressionType();
+            return MemoryRecords.withLogEntries(compressionType, converted);
+        }
+    }
+
+    public static int estimatedSize(CompressionType compressionType, Iterable<LogEntry> entries) {
+        int size = 0;
+        for (LogEntry entry : entries)
+            size += entry.sizeInBytes();
+        // NOTE: 1024 is the minimum block size for snappy encoding
+        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
+    }
+
+    /**
+     * Get an iterator over the deep records.
+     * @return An iterator over the records
+     */
+    public Iterator<Record> records() {
+        return new AbstractIterator<Record>() {
+            private final Iterator<? extends LogEntry> deepEntries = deepIterator();
+            @Override
+            protected Record makeNext() {
+                if (deepEntries.hasNext())
+                    return deepEntries.next().record();
+                return allDone();
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
index 84668a5..b25f949 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
@@ -16,34 +16,41 @@
  */
 package org.apache.kafka.common.record;
 
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 /**
  * A byte buffer backed input inputStream
  */
-public class ByteBufferInputStream extends InputStream {
-
-    private ByteBuffer buffer;
+public class ByteBufferInputStream extends DataInputStream {
 
     public ByteBufferInputStream(ByteBuffer buffer) {
-        this.buffer = buffer;
+        super(new UnderlyingInputStream(buffer));
     }
 
-    public int read() {
-        if (!buffer.hasRemaining()) {
-            return -1;
+    private static class UnderlyingInputStream extends InputStream {
+        private ByteBuffer buffer;
+
+        public UnderlyingInputStream(ByteBuffer buffer) {
+            this.buffer = buffer;
         }
-        return buffer.get() & 0xFF;
-    }
 
-    public int read(byte[] bytes, int off, int len) {
-        if (!buffer.hasRemaining()) {
-            return -1;
+        public int read() {
+            if (!buffer.hasRemaining()) {
+                return -1;
+            }
+            return buffer.get() & 0xFF;
         }
 
-        len = Math.min(len, buffer.remaining());
-        buffer.get(bytes, off, len);
-        return len;
+        public int read(byte[] bytes, int off, int len) {
+            if (!buffer.hasRemaining()) {
+                return -1;
+            }
+
+            len = Math.min(len, buffer.remaining());
+            buffer.get(bytes, off, len);
+            return len;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
new file mode 100644
index 0000000..ae0c91b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+
+/**
+ * A byte buffer backed log input stream. This class avoids the need to copy records by returning
+ * slices from the underlying byte buffer.
+ */
+class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStream.ByteBufferLogEntry> {
+    private final ByteBuffer buffer;
+    private final int maxMessageSize;
+
+    ByteBufferLogInputStream(ByteBuffer buffer, int maxMessageSize) {
+        this.buffer = buffer;
+        this.maxMessageSize = maxMessageSize;
+    }
+
+    public ByteBufferLogEntry nextEntry() throws IOException {
+        int remaining = buffer.remaining();
+        if (remaining < LOG_OVERHEAD)
+            return null;
+
+        int recordSize = buffer.getInt(buffer.position() + Records.SIZE_OFFSET);
+        if (recordSize < Record.RECORD_OVERHEAD_V0)
+            throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+        if (recordSize > maxMessageSize)
+            throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+        int entrySize = recordSize + LOG_OVERHEAD;
+        if (remaining < entrySize)
+            return null;
+
+        ByteBuffer entrySlice = buffer.slice();
+        entrySlice.limit(entrySize);
+        buffer.position(buffer.position() + entrySize);
+        return new ByteBufferLogEntry(entrySlice);
+    }
+
+    public static class ByteBufferLogEntry extends LogEntry {
+        private final ByteBuffer buffer;
+        private final Record record;
+
+        private ByteBufferLogEntry(ByteBuffer buffer) {
+            this.buffer = buffer;
+            buffer.position(LOG_OVERHEAD);
+            this.record = new Record(buffer.slice());
+            buffer.position(OFFSET_OFFSET);
+        }
+
+        @Override
+        public long offset() {
+            return buffer.getLong(OFFSET_OFFSET);
+        }
+
+        @Override
+        public Record record() {
+            return record;
+        }
+
+        public void setOffset(long offset) {
+            buffer.putLong(OFFSET_OFFSET, offset);
+        }
+
+        public void setCreateTime(long timestamp) {
+            if (record.magic() == Record.MAGIC_VALUE_V0)
+                throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
+
+            long currentTimestamp = record.timestamp();
+            // We don't need to recompute crc if the timestamp is not updated.
+            if (record.timestampType() == TimestampType.CREATE_TIME && currentTimestamp == timestamp)
+                return;
+
+            byte attributes = record.attributes();
+            buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.CREATE_TIME.updateAttributes(attributes));
+            buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
+            long crc = record.computeChecksum();
+            Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+        }
+
+        public void setLogAppendTime(long timestamp) {
+            if (record.magic() == Record.MAGIC_VALUE_V0)
+                throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
+
+            byte attributes = record.attributes();
+            buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.LOG_APPEND_TIME.updateAttributes(attributes));
+            buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
+            long crc = record.computeChecksum();
+            Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+        }
+
+        public ByteBuffer buffer() {
+            return buffer;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
index 1c9fbaa..3fb7f49 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -16,42 +16,54 @@
  */
 package org.apache.kafka.common.record;
 
+import java.io.DataOutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
  * A byte buffer backed output outputStream
  */
-public class ByteBufferOutputStream extends OutputStream {
+public class ByteBufferOutputStream extends DataOutputStream {
 
     private static final float REALLOCATION_FACTOR = 1.1f;
 
-    private ByteBuffer buffer;
-
     public ByteBufferOutputStream(ByteBuffer buffer) {
-        this.buffer = buffer;
+        super(new UnderlyingOutputStream(buffer));
     }
 
-    public void write(int b) {
-        if (buffer.remaining() < 1)
-            expandBuffer(buffer.capacity() + 1);
-        buffer.put((byte) b);
+    public ByteBuffer buffer() {
+        return ((UnderlyingOutputStream) out).buffer;
     }
 
-    public void write(byte[] bytes, int off, int len) {
-        if (buffer.remaining() < len)
-            expandBuffer(buffer.capacity() + len);
-        buffer.put(bytes, off, len);
-    }
+    public static class UnderlyingOutputStream extends OutputStream {
+        private ByteBuffer buffer;
 
-    public ByteBuffer buffer() {
-        return buffer;
-    }
+        public UnderlyingOutputStream(ByteBuffer buffer) {
+            this.buffer = buffer;
+        }
 
-    private void expandBuffer(int size) {
-        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
-        ByteBuffer temp = ByteBuffer.allocate(expandSize);
-        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
-        buffer = temp;
+        public void write(int b) {
+            if (buffer.remaining() < 1)
+                expandBuffer(buffer.capacity() + 1);
+            buffer.put((byte) b);
+        }
+
+        public void write(byte[] bytes, int off, int len) {
+            if (buffer.remaining() < len)
+                expandBuffer(buffer.capacity() + len);
+            buffer.put(bytes, off, len);
+        }
+
+        public ByteBuffer buffer() {
+            return buffer;
+        }
+
+        private void expandBuffer(int size) {
+            int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+            ByteBuffer temp = ByteBuffer.allocate(expandSize);
+            temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+            buffer = temp;
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 65a7e43..e1d4754 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -26,7 +26,7 @@ public enum CompressionType {
     public final String name;
     public final float rate;
 
-    private CompressionType(int id, String name, float rate) {
+    CompressionType(int id, String name, float rate) {
         this.id = id;
         this.name = name;
         this.rate = rate;

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
deleted file mode 100644
index a806975..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import java.lang.reflect.Constructor;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.Utils;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-public class Compressor {
-
-    static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
-    static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
-    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
-
-    private static final float[] TYPE_TO_RATE;
-
-    static {
-        int maxTypeId = -1;
-        for (CompressionType type : CompressionType.values())
-            maxTypeId = Math.max(maxTypeId, type.id);
-        TYPE_TO_RATE = new float[maxTypeId + 1];
-        for (CompressionType type : CompressionType.values()) {
-            TYPE_TO_RATE[type.id] = type.rate;
-        }
-    }
-
-    // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
-    // caching constructors to avoid invoking of Class.forName method for each batch
-    private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.xerial.snappy.SnappyOutputStream")
-                .getConstructor(OutputStream.class, Integer.TYPE);
-        }
-    });
-
-    private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
-                .getConstructor(OutputStream.class);
-        }
-    });
-
-    private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.xerial.snappy.SnappyInputStream")
-                .getConstructor(InputStream.class);
-        }
-    });
-
-    private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
-                .getConstructor(InputStream.class, Boolean.TYPE);
-        }
-    });
-
-    private final CompressionType type;
-    private final DataOutputStream appendStream;
-    private final ByteBufferOutputStream bufferStream;
-    private final int initPos;
-
-    public long writtenUncompressed;
-    public long numRecords;
-    public float compressionRate;
-    public long maxTimestamp;
-
-    public Compressor(ByteBuffer buffer, CompressionType type) {
-        this.type = type;
-        this.initPos = buffer.position();
-
-        this.numRecords = 0;
-        this.writtenUncompressed = 0;
-        this.compressionRate = 1;
-        this.maxTimestamp = Record.NO_TIMESTAMP;
-
-        if (type != CompressionType.NONE) {
-            // for compressed records, leave space for the header and the shallow message metadata
-            // and move the starting position to the value payload offset
-            buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
-        }
-
-        // create the stream
-        bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
-    }
-
-    public ByteBuffer buffer() {
-        return bufferStream.buffer();
-    }
-
-    public double compressionRate() {
-        return compressionRate;
-    }
-
-    public void close() {
-        try {
-            appendStream.close();
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        if (type != CompressionType.NONE) {
-            ByteBuffer buffer = bufferStream.buffer();
-            int pos = buffer.position();
-            // write the header, for the end offset write as number of records - 1
-            buffer.position(initPos);
-            buffer.putLong(numRecords - 1);
-            buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
-            // write the shallow message (the crc and value size are not correct yet)
-            Record.write(buffer, maxTimestamp, null, null, type, 0, -1);
-            // compute the fill the value size
-            int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
-            buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize);
-            // compute and fill the crc at the beginning of the message
-            long crc = Record.computeChecksum(buffer,
-                initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
-                pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
-            Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
-            // reset the position
-            buffer.position(pos);
-
-            // update the compression ratio
-            this.compressionRate = (float) buffer.position() / this.writtenUncompressed;
-            TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
-                compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
-        }
-    }
-
-    // Note that for all the write operations below, IO exceptions should
-    // never be thrown since the underlying ByteBufferOutputStream does not throw IOException;
-    // therefore upon encountering this issue we just close the append stream.
-
-    public void putLong(final long value) {
-        try {
-            appendStream.writeLong(value);
-        } catch (IOException e) {
-            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
-        }
-    }
-
-    public void putInt(final int value) {
-        try {
-            appendStream.writeInt(value);
-        } catch (IOException e) {
-            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
-        }
-    }
-
-    public void put(final ByteBuffer buffer) {
-        try {
-            appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
-        } catch (IOException e) {
-            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
-        }
-    }
-
-    public void putByte(final byte value) {
-        try {
-            appendStream.write(value);
-        } catch (IOException e) {
-            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
-        }
-    }
-
-    public void put(final byte[] bytes, final int offset, final int len) {
-        try {
-            appendStream.write(bytes, offset, len);
-        } catch (IOException e) {
-            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
-        }
-    }
-
-    /**
-     * @return CRC of the record
-     */
-    public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type,
-                          int valueOffset, int valueSize) {
-        // put a record as un-compressed into the underlying stream
-        long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize);
-        byte attributes = Record.computeAttributes(type);
-        putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize);
-        return crc;
-    }
-
-    /**
-     * Put a record as uncompressed into the underlying stream
-     * @return CRC of the record
-     */
-    public long putRecord(long timestamp, byte[] key, byte[] value) {
-        return putRecord(timestamp, key, value, CompressionType.NONE, 0, -1);
-    }
-
-    private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
-        maxTimestamp = Math.max(maxTimestamp, timestamp);
-        Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize);
-    }
-
-    public void recordWritten(int size) {
-        numRecords += 1;
-        writtenUncompressed += size;
-    }
-
-    public long numRecordsWritten() {
-        return numRecords;
-    }
-
-    public long estimatedBytesWritten() {
-        if (type == CompressionType.NONE) {
-            return bufferStream.buffer().position();
-        } else {
-            // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
-            return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
-        }
-    }
-
-    // the following two functions also need to be public since they are used in MemoryRecords.iteration
-
-    public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
-        try {
-            switch (type) {
-                case NONE:
-                    return new DataOutputStream(buffer);
-                case GZIP:
-                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
-                case SNAPPY:
-                    try {
-                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
-                        return new DataOutputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                case LZ4:
-                    try {
-                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
-                        return new DataOutputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                default:
-                    throw new IllegalArgumentException("Unknown compression type: " + type);
-            }
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
-        try {
-            switch (type) {
-                case NONE:
-                    return new DataInputStream(buffer);
-                case GZIP:
-                    return new DataInputStream(new GZIPInputStream(buffer));
-                case SNAPPY:
-                    try {
-                        InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
-                        return new DataInputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                case LZ4:
-                    try {
-                        InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
-                                messageVersion == Record.MAGIC_VALUE_V0);
-                        return new DataInputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                default:
-                    throw new IllegalArgumentException("Unknown compression type: " + type);
-            }
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    private interface ConstructorSupplier {
-        Constructor get() throws ClassNotFoundException, NoSuchMethodException;
-    }
-
-    // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
-    private static class MemoizingConstructorSupplier {
-        final ConstructorSupplier delegate;
-        transient volatile boolean initialized;
-        transient Constructor value;
-
-        public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
-            this.delegate = delegate;
-        }
-
-        public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
-            if (!initialized) {
-                synchronized (this) {
-                    if (!initialized) {
-                        Constructor constructor = delegate.get();
-                        value = constructor;
-                        initialized = true;
-                        return constructor;
-                    }
-                }
-            }
-            return value;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
new file mode 100644
index 0000000..ae393b0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * A log input stream which is backed by a {@link FileChannel}.
+ */
+public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelLogEntry> {
+    private int position;
+    private final int end;
+    private final FileChannel channel;
+    private final int maxRecordSize;
+    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+
+    /**
+     * Create a new log input stream over the FileChannel
+     * @param channel Underlying FileChannel
+     * @param maxRecordSize Maximum size of records
+     * @param start Position in the file channel to start from
+     * @param end Position in the file channel not to read past
+     */
+    public FileLogInputStream(FileChannel channel,
+                              int maxRecordSize,
+                              int start,
+                              int end) {
+        this.channel = channel;
+        this.maxRecordSize = maxRecordSize;
+        this.position = start;
+        this.end = end;
+    }
+
+    @Override
+    public FileChannelLogEntry nextEntry() throws IOException {
+        if (position + Records.LOG_OVERHEAD >= end)
+            return null;
+
+        logHeaderBuffer.rewind();
+        channel.read(logHeaderBuffer, position);
+        if (logHeaderBuffer.hasRemaining())
+            return null;
+
+        logHeaderBuffer.rewind();
+        long offset = logHeaderBuffer.getLong();
+        int size = logHeaderBuffer.getInt();
+
+        if (size < Record.RECORD_OVERHEAD_V0)
+            throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", Record.RECORD_OVERHEAD_V0));
+
+        if (size > maxRecordSize)
+            throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize));
+
+        if (position + Records.LOG_OVERHEAD + size > end)
+            return null;
+
+        FileChannelLogEntry logEntry = new FileChannelLogEntry(offset, channel, position, size);
+        position += logEntry.sizeInBytes();
+        return logEntry;
+    }
+
+    /**
+     * Log entry backed by an underlying FileChannel. This allows iteration over the shallow log
+     * entries without needing to read the record data into memory until it is needed. The downside
+     * is that entries will generally no longer be readable when the underlying channel is closed.
+     */
+    public static class FileChannelLogEntry extends LogEntry {
+        private final long offset;
+        private final FileChannel channel;
+        private final int position;
+        private final int recordSize;
+        private Record record = null;
+
+        private FileChannelLogEntry(long offset,
+                                   FileChannel channel,
+                                   int position,
+                                   int recordSize) {
+            this.offset = offset;
+            this.channel = channel;
+            this.position = position;
+            this.recordSize = recordSize;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+
+        public int position() {
+            return position;
+        }
+
+        @Override
+        public byte magic() {
+            if (record != null)
+                return record.magic();
+
+            try {
+                byte[] magic = new byte[1];
+                ByteBuffer buf = ByteBuffer.wrap(magic);
+                channel.read(buf, position + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET);
+                if (buf.hasRemaining())
+                    throw new KafkaException("Failed to read magic byte from FileChannel " + channel);
+                return magic[0];
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+        }
+
+        /**
+         * Force load the record and its data (key and value) into memory.
+         * @return The resulting record
+         * @throws IOException for any IO errors reading from the underlying file
+         */
+        private Record loadRecord() throws IOException {
+            if (record != null)
+                return record;
+
+            ByteBuffer recordBuffer = ByteBuffer.allocate(recordSize);
+            channel.read(recordBuffer, position + Records.LOG_OVERHEAD);
+            if (recordBuffer.hasRemaining())
+                throw new IOException("Failed to read full record from channel " + channel);
+
+            recordBuffer.rewind();
+            record = new Record(recordBuffer);
+            return record;
+        }
+
+        @Override
+        public Record record() {
+            if (record != null)
+                return record;
+
+            try {
+                return loadRecord();
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+        }
+
+        @Override
+        public int sizeInBytes() {
+            return Records.LOG_OVERHEAD + recordSize;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index bdae08d..faf61e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -18,22 +18,31 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry;
+import org.apache.kafka.common.utils.Utils;
 
+import java.io.Closeable;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * File-backed record set.
+ * A {@link Records} implementation backed by a file. An optional start and end position can be applied to this
+ * instance to enable slicing a range of the log records.
  */
-public class FileRecords implements Records {
-    private final File file;
+public class FileRecords extends AbstractRecords implements Closeable {
+    private final boolean isSlice;
     private final FileChannel channel;
-    private final long start;
-    private final long end;
-    private final long size;
+    private final int start;
+    private final int end;
+    private volatile File file;
+    private final AtomicInteger size;
 
     public FileRecords(File file,
                        FileChannel channel,
@@ -44,83 +53,435 @@ public class FileRecords implements Records {
         this.channel = channel;
         this.start = start;
         this.end = end;
+        this.isSlice = isSlice;
+        this.size = new AtomicInteger();
 
-        if (isSlice)
-            this.size = end - start;
-        else
-            this.size = Math.min(channel.size(), end) - start;
+        // set the initial size of the buffer
+        resize();
+    }
+
+    public void resize() throws IOException {
+        if (isSlice) {
+            size.set(end - start);
+        } else {
+            int limit = Math.min((int) channel.size(), end);
+            size.set(limit - start);
+
+            // if this is not a slice, update the file pointer to the end of the file
+            // set the file position to the last byte in the file
+            channel.position(limit);
+        }
     }
 
     @Override
     public int sizeInBytes() {
-        return (int) size;
+        return size.get();
+    }
+
+    /**
+     * Get the underlying file.
+     * @return The file
+     */
+    public File file() {
+        return file;
+    }
+
+    /**
+     * Get the underlying file channel.
+     * @return The file channel
+     */
+    public FileChannel channel() {
+        return channel;
+    }
+
+    /**
+     * Read log entries into a given buffer.
+     * @param buffer The buffer to write the entries to
+     * @param position Position in the buffer to read from
+     * @return The same buffer
+     * @throws IOException
+     */
+    public ByteBuffer readInto(ByteBuffer buffer, int position) throws IOException {
+        channel.read(buffer, position + this.start);
+        buffer.flip();
+        return buffer;
+    }
+
+    /**
+     * Return a slice of records from this instance, which is a view into this set starting from the given position
+     * and with the given size limit.
+     *
+     * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
+     *
+     * If this message set is already sliced, the position will be taken relative to that slicing.
+     *
+     * @param position The start position to begin the read from
+     * @param size The number of bytes after the start position to include
+     * @return A sliced wrapper on this message set limited based on the given position and size
+     */
+    public FileRecords read(int position, int size) throws IOException {
+        if (position < 0)
+            throw new IllegalArgumentException("Invalid position: " + position);
+        if (size < 0)
+            throw new IllegalArgumentException("Invalid size: " + size);
+
+        final int end;
+        if (this.start + position + size < 0)
+            end = sizeInBytes();
+        else
+            end = Math.min(this.start + position + size, sizeInBytes());
+        return new FileRecords(file, channel, this.start + position, end, true);
+    }
+
+    /**
+     * Append log entries to the buffer
+     * @param records The records to append
+     * @return the number of bytes written to the underlying file
+     */
+    public int append(MemoryRecords records) throws IOException {
+        int written = records.writeFullyTo(channel);
+        size.getAndAdd(written);
+        return written;
+    }
+
+    /**
+     * Commit all written data to the physical disk
+     */
+    public void flush() throws IOException {
+        channel.force(true);
+    }
+
+    /**
+     * Close this record set
+     */
+    public void close() throws IOException {
+        flush();
+        trim();
+        channel.close();
+    }
+
+    /**
+     * Delete this message set from the filesystem
+     * @return True iff this message set was deleted.
+     */
+    public boolean delete() {
+        Utils.closeQuietly(channel, "FileChannel");
+        return file.delete();
+    }
+
+    /**
+     * Trim file when close or roll to next file
+     */
+    public void trim() throws IOException {
+        truncateTo(sizeInBytes());
+    }
+
+    /**
+     * Update the file reference (to be used with caution since this does not reopen the file channel)
+     * @param file The new file to use
+     */
+    public void setFile(File file) {
+        this.file = file;
+    }
+
+    /**
+     * Rename the file that backs this message set
+     * @throws IOException if rename fails.
+     */
+    public void renameTo(File f) throws IOException {
+        try {
+            Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
+        }  finally {
+            this.file = f;
+        }
+    }
+
+    /**
+     * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
+     * given size falls on a valid message boundary.
+     * In some versions of the JDK truncating to the same size as the file message set will cause an
+     * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
+     * size of the underlying FileChannel.
+     * It is expected that no other threads will do writes to the log when this function is called.
+     * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
+     * @return The number of bytes truncated off
+     */
+    public int truncateTo(int targetSize) throws IOException {
+        int originalSize = sizeInBytes();
+        if (targetSize > originalSize || targetSize < 0)
+            throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
+                    " size of this log segment is " + originalSize + " bytes.");
+        if (targetSize < (int) channel.size()) {
+            channel.truncate(targetSize);
+            channel.position(targetSize);
+            size.set(targetSize);
+        }
+        return originalSize - targetSize;
     }
 
     @Override
     public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
         long newSize = Math.min(channel.size(), end) - start;
-        if (newSize < size)
+        if (newSize < size.get())
             throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize));
 
-        if (offset > size)
-            throw new KafkaException(String.format("The requested offset %d is out of range. The size of this FileRecords is %d.", offset, size));
-
         long position = start + offset;
-        long count = Math.min(length, this.size - offset);
+        long count = Math.min(length, size.get());
+        final long bytesTransferred;
         if (destChannel instanceof TransportLayer) {
             TransportLayer tl = (TransportLayer) destChannel;
-            return tl.transferFrom(this.channel, position, count);
+            bytesTransferred = tl.transferFrom(channel, position, count);
         } else {
-            return this.channel.transferTo(position, count, destChannel);
+            bytesTransferred = channel.transferTo(position, count, destChannel);
+        }
+        return bytesTransferred;
+    }
+
+    /**
+     * Search forward for the file position of the last offset that is greater than or equal to the target offset
+     * and return its physical position and the size of the message (including log overhead) at the returned offset. If
+     * no such offsets are found, return null.
+     *
+     * @param targetOffset The offset to search for.
+     * @param startingPosition The starting position in the file to begin searching from.
+     */
+    public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
+        Iterator<FileChannelLogEntry> iterator = shallowIteratorFrom(Integer.MAX_VALUE, startingPosition);
+        while (iterator.hasNext()) {
+            FileChannelLogEntry entry = iterator.next();
+            long offset = entry.offset();
+            if (offset >= targetOffset)
+                return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
+        }
+        return null;
+    }
+
+    /**
+     * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+     *
+     * @param targetTimestamp The timestamp to search for.
+     * @param startingPosition The starting position to search.
+     * @return The timestamp and offset of the message found. None, if no message is found.
+     */
+    public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
+        Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
+        while (shallowIterator.hasNext()) {
+            LogEntry shallowEntry = shallowIterator.next();
+            Record shallowRecord = shallowEntry.record();
+            if (shallowRecord.timestamp() >= targetTimestamp) {
+                // We found a message
+                for (LogEntry deepLogEntry : shallowEntry) {
+                    long timestamp = deepLogEntry.record().timestamp();
+                    if (timestamp >= targetTimestamp)
+                        return new TimestampAndOffset(timestamp, deepLogEntry.offset());
+                }
+                throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s" +
+                        " should contain target timestamp %s, but does not.", shallowRecord.timestamp(),
+                        shallowEntry.offset(), targetTimestamp));
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Return the largest timestamp of the messages after a given position in this file message set.
+     * @param startingPosition The starting position.
+     * @return The largest timestamp of the messages after the given position.
+     */
+    public TimestampAndOffset largestTimestampAfter(int startingPosition) {
+        long maxTimestamp = Record.NO_TIMESTAMP;
+        long offsetOfMaxTimestamp = -1L;
+
+        Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
+        while (shallowIterator.hasNext()) {
+            LogEntry shallowEntry = shallowIterator.next();
+            long timestamp = shallowEntry.record().timestamp();
+            if (timestamp > maxTimestamp) {
+                maxTimestamp = timestamp;
+                offsetOfMaxTimestamp = shallowEntry.offset();
+            }
         }
+        return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp);
+    }
+
+    /**
+     * Get an iterator over the shallow entries in the file. Note that the entries are
+     * backed by the open file channel. When the channel is closed (i.e. when this instance
+     * is closed), the entries will generally no longer be readable.
+     * @return An iterator over the shallow entries
+     */
+    @Override
+    public Iterator<FileChannelLogEntry> shallowIterator() {
+        return shallowIteratorFrom(start);
+    }
+
+    /**
+     * Get an iterator over the shallow entries, enforcing a maximum record size
+     * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
+     * @return An iterator over the shallow entries
+     */
+    public Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize) {
+        return shallowIteratorFrom(maxRecordSize, start);
+    }
+
+    private Iterator<FileChannelLogEntry> shallowIteratorFrom(int start) {
+        return shallowIteratorFrom(Integer.MAX_VALUE, start);
+    }
+
+    private Iterator<FileChannelLogEntry> shallowIteratorFrom(int maxRecordSize, int start) {
+        final int end;
+        if (isSlice)
+            end = this.end;
+        else
+            end = this.sizeInBytes();
+        FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end);
+        return RecordsIterator.shallowIterator(inputStream);
     }
 
     @Override
-    public RecordsIterator iterator() {
-        return new RecordsIterator(new FileLogInputStream(channel, start, end), false);
+    public Iterator<LogEntry> deepIterator() {
+        final int end;
+        if (isSlice)
+            end = this.end;
+        else
+            end = this.sizeInBytes();
+        FileLogInputStream inputStream = new FileLogInputStream(channel, Integer.MAX_VALUE, start, end);
+        return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE);
+    }
+
+    public static FileRecords open(File file,
+                                   boolean mutable,
+                                   boolean fileAlreadyExists,
+                                   int initFileSize,
+                                   boolean preallocate) throws IOException {
+        FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
+        int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
+        return new FileRecords(file, channel, 0, end, false);
+    }
+
+    public static FileRecords open(File file,
+                                   boolean fileAlreadyExists,
+                                   int initFileSize,
+                                   boolean preallocate) throws IOException {
+        return open(file, true, fileAlreadyExists, initFileSize, preallocate);
+    }
+
+    public static FileRecords open(File file, boolean mutable) throws IOException {
+        return open(file, mutable, false, 0, false);
+    }
+
+    public static FileRecords open(File file) throws IOException {
+        return open(file, true);
+    }
+
+    /**
+     * Open a channel for the given file
+     * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
+     * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
+     * @param file File path
+     * @param mutable mutable
+     * @param fileAlreadyExists File already exists or not
+     * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
+     * @param preallocate Pre allocate file or not, gotten from configuration.
+     */
+    private static FileChannel openChannel(File file,
+                                           boolean mutable,
+                                           boolean fileAlreadyExists,
+                                           int initFileSize,
+                                           boolean preallocate) throws IOException {
+        if (mutable) {
+            if (fileAlreadyExists) {
+                return new RandomAccessFile(file, "rw").getChannel();
+            } else {
+                if (preallocate) {
+                    RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+                    randomAccessFile.setLength(initFileSize);
+                    return randomAccessFile.getChannel();
+                } else {
+                    return new RandomAccessFile(file, "rw").getChannel();
+                }
+            }
+        } else {
+            return new FileInputStream(file).getChannel();
+        }
     }
 
-    private static class FileLogInputStream implements LogInputStream {
-        private long position;
-        protected final long end;
-        protected final FileChannel channel;
-        private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+    public static class LogEntryPosition {
+        public final long offset;
+        public final int position;
+        public final int size;
 
-        public FileLogInputStream(FileChannel channel, long start, long end) {
-            this.channel = channel;
-            this.position = start;
-            this.end = end;
+        public LogEntryPosition(long offset, int position, int size) {
+            this.offset = offset;
+            this.position = position;
+            this.size = size;
         }
 
         @Override
-        public LogEntry nextEntry() throws IOException {
-            if (position + Records.LOG_OVERHEAD >= end)
-                return null;
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
 
-            logHeaderBuffer.rewind();
-            channel.read(logHeaderBuffer, position);
-            if (logHeaderBuffer.hasRemaining())
-                return null;
+            LogEntryPosition that = (LogEntryPosition) o;
 
-            logHeaderBuffer.rewind();
-            long offset = logHeaderBuffer.getLong();
-            int size = logHeaderBuffer.getInt();
-            if (size < 0)
-                throw new IllegalStateException("Record with size " + size);
+            if (offset != that.offset) return false;
+            if (position != that.position) return false;
+            return size == that.size;
 
-            if (position + Records.LOG_OVERHEAD + size > end)
-                return null;
+        }
 
-            ByteBuffer recordBuffer = ByteBuffer.allocate(size);
-            channel.read(recordBuffer, position + Records.LOG_OVERHEAD);
-            if (recordBuffer.hasRemaining())
-                return null;
-            recordBuffer.rewind();
+        @Override
+        public int hashCode() {
+            int result = (int) (offset ^ (offset >>> 32));
+            result = 31 * result + position;
+            result = 31 * result + size;
+            return result;
+        }
 
-            Record record = new Record(recordBuffer);
-            LogEntry logEntry = new LogEntry(offset, record);
-            position += logEntry.size();
-            return logEntry;
+        @Override
+        public String toString() {
+            return "LogEntryPosition(" +
+                    "offset=" + offset +
+                    ", position=" + position +
+                    ", size=" + size +
+                    ')';
         }
     }
+
+    public static class TimestampAndOffset {
+        public final long timestamp;
+        public final long offset;
+
+        public TimestampAndOffset(long timestamp, long offset) {
+            this.timestamp = timestamp;
+            this.offset = offset;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            TimestampAndOffset that = (TimestampAndOffset) o;
+
+            if (timestamp != that.timestamp) return false;
+            return offset == that.offset;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) (timestamp ^ (timestamp >>> 32));
+            result = 31 * result + (int) (offset ^ (offset >>> 32));
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "TimestampAndOffset(" +
+                    "timestamp=" + timestamp +
+                    ", offset=" + offset +
+                    ')';
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
index a1009ca..ee60713 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
 
-public class InvalidRecordException extends KafkaException {
+public class InvalidRecordException extends CorruptRecordException {
 
     private static final long serialVersionUID = 1;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
index 2e54b56..d2db356 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
@@ -16,33 +16,156 @@
  */
 package org.apache.kafka.common.record;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+
 /**
  * An offset and record pair
  */
-public final class LogEntry {
+public abstract class LogEntry implements Iterable<LogEntry> {
 
-    private final long offset;
-    private final Record record;
+    /**
+     * Get the offset of this entry. Note that if this entry contains a compressed
+     * message set, then this offset will be the last offset of the nested entries
+     * @return the last offset contained in this entry
+     */
+    public abstract long offset();
 
-    public LogEntry(long offset, Record record) {
-        this.offset = offset;
-        this.record = record;
+    /**
+     * Get the shallow record for this log entry.
+     * @return the shallow record
+     */
+    public abstract Record record();
+
+    /**
+     * Get the first offset of the records contained in this entry. Note that this
+     * generally requires deep iteration, which requires decompression, so this should
+     * be used with caution.
+     * @return The first offset contained in this entry
+     */
+    public long firstOffset() {
+        return iterator().next().offset();
     }
 
-    public long offset() {
-        return this.offset;
+    /**
+     * Get the offset following this entry (i.e. the last offset contained in this entry plus one).
+     * @return the next consecutive offset following this entry
+     */
+    public long nextOffset() {
+        return offset() + 1;
     }
 
-    public Record record() {
-        return this.record;
+    /**
+     * Get the message format version of this entry (i.e its magic value).
+     * @return the magic byte
+     */
+    public byte magic() {
+        return record().magic();
     }
 
     @Override
     public String toString() {
-        return "LogEntry(" + offset + ", " + record + ")";
+        return "LogEntry(" + offset() + ", " + record() + ")";
+    }
+
+    /**
+     * Get the size in bytes of this entry, including the size of the record and the log overhead.
+     * @return The size in bytes of this entry
+     */
+    public int sizeInBytes() {
+        return record().sizeInBytes() + LOG_OVERHEAD;
+    }
+
+    /**
+     * Check whether this entry contains a compressed message set.
+     * @return true if so, false otherwise
+     */
+    public boolean isCompressed() {
+        return record().compressionType() != CompressionType.NONE;
+    }
+
+    /**
+     * Write this entry into a buffer.
+     * @param buffer The buffer to write the entry to
+     */
+    public void writeTo(ByteBuffer buffer) {
+        writeHeader(buffer, offset(), record().sizeInBytes());
+        buffer.put(record().buffer().duplicate());
+    }
+
+    /**
+     * Get an iterator for the nested entries contained within this log entry. Note that
+     * if the entry is not compressed, then this method will return an iterator over the
+     * shallow entry only (i.e. this object).
+     * @return An iterator over the entries contained within this log entry
+     */
+    @Override
+    public Iterator<LogEntry> iterator() {
+        if (isCompressed())
+            return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+        return Collections.singletonList(this).iterator();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || !(o instanceof LogEntry)) return false;
+
+        LogEntry that = (LogEntry) o;
+
+        if (offset() != that.offset()) return false;
+        Record thisRecord = record();
+        Record thatRecord = that.record();
+        return thisRecord != null ? thisRecord.equals(thatRecord) : thatRecord == null;
+    }
+
+    @Override
+    public int hashCode() {
+        long offset = offset();
+        Record record = record();
+        int result = (int) (offset ^ (offset >>> 32));
+        result = 31 * result + (record != null ? record.hashCode() : 0);
+        return result;
+    }
+
+    public static void writeHeader(ByteBuffer buffer, long offset, int size) {
+        buffer.putLong(offset);
+        buffer.putInt(size);
+    }
+
+    public static void writeHeader(DataOutputStream out, long offset, int size) throws IOException {
+        out.writeLong(offset);
+        out.writeInt(size);
     }
-    
-    public int size() {
-        return record.size() + Records.LOG_OVERHEAD;
+
+    private static class SimpleLogEntry extends LogEntry {
+        private final long offset;
+        private final Record record;
+
+        public SimpleLogEntry(long offset, Record record) {
+            this.offset = offset;
+            this.record = record;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+
+        @Override
+        public Record record() {
+            return record;
+        }
+
+    }
+
+    public static LogEntry create(long offset, Record record) {
+        return new SimpleLogEntry(offset, record);
     }
+
 }