You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/28 17:00:03 UTC

kafka git commit: KAFKA-5316; LogCleaner should account for larger record sets after cleaning

Repository: kafka
Updated Branches:
  refs/heads/trunk b50387eb7 -> dfa3c8a92


KAFKA-5316; LogCleaner should account for larger record sets after cleaning

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #3142 from hachikuji/KAFKA-5316


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfa3c8a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfa3c8a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfa3c8a9

Branch: refs/heads/trunk
Commit: dfa3c8a92dddd58cab95e12c72669f250bb99683
Parents: b50387e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun May 28 09:57:59 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sun May 28 09:57:59 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   2 +-
 .../producer/internals/ProducerBatch.java       |  57 +++------
 .../record/AbstractLegacyRecordBatch.java       |   6 +
 .../kafka/common/record/DefaultRecordBatch.java |  14 +--
 .../kafka/common/record/FileLogInputStream.java |   7 --
 .../apache/kafka/common/record/FileRecords.java |  21 +---
 .../kafka/common/record/MemoryRecords.java      | 110 ++++++++++------
 .../common/record/MemoryRecordsBuilder.java     | 124 ++++++++++++-------
 .../kafka/common/record/MutableRecordBatch.java |   9 ++
 .../common/utils/ByteBufferOutputStream.java    |  45 ++++++-
 .../org/apache/kafka/clients/MockClient.java    |  15 ++-
 .../internals/RecordAccumulatorTest.java        |   8 +-
 .../clients/producer/internals/SenderTest.java  |  61 ++++++---
 .../common/record/FileLogInputStreamTest.java   |   2 +-
 .../kafka/common/record/MemoryRecordsTest.java  |  70 +++++++++--
 .../utils/ByteBufferOutputStreamTest.java       | 101 +++++++++++++++
 core/src/main/scala/kafka/log/Log.scala         |   2 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  11 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  14 +--
 .../scala/kafka/tools/DumpLogSegments.scala     |   4 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  10 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   9 +-
 22 files changed, 490 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 743c68d..ccab85c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -105,7 +105,7 @@
     </module>
     <module name="ClassDataAbstractionCoupling">
       <!-- default is 7 -->
-      <property name="max" value="17"/>
+      <property name="max" value="20"/>
     </module>
     <module name="BooleanExpressionComplexity">
       <!-- default is 3 -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index df79707..974e230 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Iterator;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -27,23 +24,24 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
-import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
 
@@ -119,9 +117,9 @@ public final class ProducerBatch {
     }
 
     /**
-     +     * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
-     +     * @return true if the record has been successfully appended, false otherwise.
-     +     */
+     * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
+     * @return true if the record has been successfully appended, false otherwise.
+     */
     private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
         if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
             return false;
@@ -196,15 +194,13 @@ public final class ProducerBatch {
             assert thunkIter.hasNext();
             Thunk thunk = thunkIter.next();
             if (batch == null) {
-                batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
-                                                           record, splitBatchSize, this.createdMs);
+                batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
             }
 
             // A newly created batch can always host the first message.
             if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
                 batches.add(batch);
-                batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
-                                                           record, splitBatchSize, this.createdMs);
+                batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
                 batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
             }
         }
@@ -217,30 +213,13 @@ public final class ProducerBatch {
         return batches;
     }
 
-    private ProducerBatch createBatchOffAccumulatorForRecord(TopicPartition tp,
-                                                             CompressionType compressionType,
-                                                             Record record,
-                                                             int batchSize,
-                                                             long createdMs) {
-        int initialSize = Math.max(Records.LOG_OVERHEAD + AbstractRecords.sizeInBytesUpperBound(magic(),
-                                                                                                record.key(),
-                                                                                                record.value(),
-                                                                                                record.headers()),
-                                   batchSize);
-        return createBatchOffAccumulator(tp, compressionType, initialSize, createdMs);
-    }
-
-    // package private for testing purpose.
-    static ProducerBatch createBatchOffAccumulator(TopicPartition tp,
-                                                   CompressionType compressionType,
-                                                   int batchSize,
-                                                   long createdMs) {
-        ByteBuffer buffer = ByteBuffer.allocate(batchSize);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
-                                                             compressionType,
-                                                             TimestampType.CREATE_TIME,
-                                                             batchSize);
-        return new ProducerBatch(tp, builder, createdMs, true);
+    private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
+        int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(),
+                record.key(), record.value(), record.headers()), batchSize);
+        ByteBuffer buffer = ByteBuffer.allocate(initialSize);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
+                TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional());
+        return new ProducerBatch(topicPartition, builder, this.createdMs, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index e028988..be69686 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
@@ -479,6 +480,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
         }
 
         @Override
+        public void writeTo(ByteBufferOutputStream outputStream) {
+            outputStream.write(buffer.duplicate());
+        }
+
+        @Override
         public boolean equals(Object o) {
             if (this == o)
                 return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 4e52d61..f01116e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -19,10 +19,10 @@ package org.apache.kafka.common.record;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Crc32C;
-import org.apache.kafka.common.utils.Utils;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -207,6 +207,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     }
 
     @Override
+    public void writeTo(ByteBufferOutputStream outputStream) {
+        outputStream.write(this.buffer.duplicate());
+    }
+
+    @Override
     public boolean isTransactional() {
         return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
     }
@@ -447,13 +452,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     /**
      * Get an upper bound on the size of a batch with only a single record using a given key and value.
      */
-    static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
-        return batchSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
-    }
-
-    /**
-     * Get an upper bound on the size of a batch with only a single record using a given key and value.
-     */
     static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
         return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 1af5527..5fe1cef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -35,22 +35,18 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
     private int position;
     private final int end;
     private final FileChannel channel;
-    private final int maxRecordSize;
     private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD);
 
     /**
      * Create a new log input stream over the FileChannel
      * @param channel Underlying FileChannel
-     * @param maxRecordSize Maximum size of records
      * @param start Position in the file channel to start from
      * @param end Position in the file channel not to read past
      */
     FileLogInputStream(FileChannel channel,
-                       int maxRecordSize,
                        int start,
                        int end) {
         this.channel = channel;
-        this.maxRecordSize = maxRecordSize;
         this.position = start;
         this.end = end;
     }
@@ -71,9 +67,6 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         if (size < LegacyRecord.RECORD_OVERHEAD_V0)
             throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
 
-        if (size > maxRecordSize)
-            throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize));
-
         if (position + LOG_OVERHEAD + size > end)
             return null;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 16d3777..a72ba8b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -339,35 +339,22 @@ public class FileRecords extends AbstractRecords implements Closeable {
         return batches;
     }
 
-    /**
-     * Get an iterator over the record batches, enforcing a maximum record size
-     * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
-     * @return An iterator over the batches
-     */
-    public Iterable<FileChannelRecordBatch> batches(int maxRecordSize) {
-        return batches(maxRecordSize, start);
-    }
-
-    private Iterable<FileChannelRecordBatch> batchesFrom(int start) {
-        return batches(Integer.MAX_VALUE, start);
-    }
-
-    private Iterable<FileChannelRecordBatch> batches(final int maxRecordSize, final int start) {
+    private Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
         return new Iterable<FileChannelRecordBatch>() {
             @Override
             public Iterator<FileChannelRecordBatch> iterator() {
-                return batchIterator(maxRecordSize, start);
+                return batchIterator(start);
             }
         };
     }
 
-    private Iterator<FileChannelRecordBatch> batchIterator(int maxRecordSize, int start) {
+    private Iterator<FileChannelRecordBatch> batchIterator(int start) {
         final int end;
         if (isSlice)
             end = this.end;
         else
             end = this.sizeInBytes();
-        FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end);
+        FileLogInputStream inputStream = new FileLogInputStream(channel, start, end);
         return new RecordBatchIterator<>(inputStream);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index d3bdee2..56d7ed1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
@@ -31,8 +36,8 @@ import java.util.Objects;
  * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType, long)} variants.
  */
 public class MemoryRecords extends AbstractRecords {
-
-    public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
+    private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class);
+    public static final MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
 
     private final ByteBuffer buffer;
 
@@ -110,16 +115,21 @@ public class MemoryRecords extends AbstractRecords {
 
     /**
      * Filter the records into the provided ByteBuffer.
+     * @param partition The partition that is filtered (used only for logging)
      * @param filter The filter function
      * @param destinationBuffer The byte buffer to write the filtered records to
-     * @return A FilterResult with a summary of the output (for metrics)
+     * @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch
+     *                           exceeds this after filtering, we log a warning, but the batch will still be
+     *                           created.
+     * @return A FilterResult with a summary of the output (for metrics) and potentially an overflow buffer
      */
-    public FilterResult filterTo(RecordFilter filter, ByteBuffer destinationBuffer) {
-        return filterTo(batches(), filter, destinationBuffer);
+    public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer,
+                                 int maxRecordBatchSize) {
+        return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize);
     }
 
-    private static FilterResult filterTo(Iterable<MutableRecordBatch> batches, RecordFilter filter,
-                                         ByteBuffer destinationBuffer) {
+    private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
+                                         RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize) {
         long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
@@ -128,6 +138,8 @@ public class MemoryRecords extends AbstractRecords {
         int messagesRetained = 0;
         int bytesRetained = 0;
 
+        ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
+
         for (MutableRecordBatch batch : batches) {
             bytesRead += batch.sizeInBytes();
 
@@ -140,7 +152,7 @@ public class MemoryRecords extends AbstractRecords {
             // recopy the messages to the destination buffer.
 
             byte batchMagic = batch.magic();
-            boolean writeOriginalEntry = true;
+            boolean writeOriginalBatch = true;
             List<Record> retainedRecords = new ArrayList<>();
 
             for (Record record : batch) {
@@ -150,20 +162,19 @@ public class MemoryRecords extends AbstractRecords {
                     // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
                     // the corrupted batch with correct data.
                     if (!record.hasMagic(batchMagic))
-                        writeOriginalEntry = false;
+                        writeOriginalBatch = false;
 
                     if (record.offset() > maxOffset)
                         maxOffset = record.offset();
 
                     retainedRecords.add(record);
                 } else {
-                    writeOriginalEntry = false;
+                    writeOriginalBatch = false;
                 }
             }
 
-            if (writeOriginalEntry) {
-                // There are no messages compacted out and no message format conversion, write the original message set back
-                batch.writeTo(destinationBuffer);
+            if (writeOriginalBatch) {
+                batch.writeTo(bufferOutputStream);
                 messagesRetained += retainedRecords.size();
                 bytesRetained += batch.sizeInBytes();
                 if (batch.maxTimestamp() > maxTimestamp) {
@@ -171,29 +182,18 @@ public class MemoryRecords extends AbstractRecords {
                     shallowOffsetOfMaxTimestamp = batch.lastOffset();
                 }
             } else if (!retainedRecords.isEmpty()) {
-                ByteBuffer slice = destinationBuffer.slice();
-                TimestampType timestampType = batch.timestampType();
-                long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
-                long baseOffset = batchMagic >= RecordBatch.MAGIC_VALUE_V2 ?
-                        batch.baseOffset() : retainedRecords.get(0).offset();
-
-                MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType,
-                        baseOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(),
-                        batch.isTransactional(), batch.partitionLeaderEpoch());
-
-                for (Record record : retainedRecords)
-                    builder.append(record);
-
-                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2)
-                    // we must preserve the last offset from the initial batch in order to ensure that the
-                    // last sequence number from the batch remains even after compaction. Otherwise, the producer
-                    // could incorrectly see an out of sequence error.
-                    builder.overrideLastOffset(batch.lastOffset());
-
+                MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
                 MemoryRecords records = builder.build();
-                destinationBuffer.position(destinationBuffer.position() + slice.position());
+                int filteredBatchSize = records.sizeInBytes();
+
                 messagesRetained += retainedRecords.size();
-                bytesRetained += records.sizeInBytes();
+                bytesRetained += filteredBatchSize;
+
+                if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
+                    log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
+                                    "(new size is {}). Consumers from version 0.10.1 and earlier may need to " +
+                                    "increase their fetch sizes.",
+                            partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
 
                 MemoryRecordsBuilder.RecordsInfo info = builder.info();
                 if (info.maxTimestamp > maxTimestamp) {
@@ -201,9 +201,44 @@ public class MemoryRecords extends AbstractRecords {
                     shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
                 }
             }
+
+            // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to
+            // avoid the need for additional allocations.
+            ByteBuffer outputBuffer = bufferOutputStream.buffer();
+            if (outputBuffer != destinationBuffer)
+                return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
+                        maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
         }
 
-        return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+        return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
+                maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+    }
+
+    private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
+                                                                 List<Record> retainedRecords,
+                                                                 ByteBufferOutputStream bufferOutputStream) {
+        byte magic = originalBatch.magic();
+        TimestampType timestampType = originalBatch.timestampType();
+        long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
+                originalBatch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
+        long baseOffset = magic >= RecordBatch.MAGIC_VALUE_V2 ?
+                originalBatch.baseOffset() : retainedRecords.get(0).offset();
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
+                originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
+                originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
+                originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
+
+        for (Record record : retainedRecords)
+            builder.append(record);
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2)
+            // we must preserve the last offset from the initial batch in order to ensure that the
+            // last sequence number from the batch remains even after compaction. Otherwise, the producer
+            // could incorrectly see an out of sequence error.
+            builder.overrideLastOffset(originalBatch.lastOffset());
+
+        return builder;
     }
 
     /**
@@ -271,6 +306,7 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public static class FilterResult {
+        public final ByteBuffer output;
         public final int messagesRead;
         public final int bytesRead;
         public final int messagesRetained;
@@ -279,13 +315,15 @@ public class MemoryRecords extends AbstractRecords {
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;
 
-        public FilterResult(int messagesRead,
+        public FilterResult(ByteBuffer output,
+                            int messagesRead,
                             int bytesRead,
                             int messagesRetained,
                             int bytesRetained,
                             long maxOffset,
                             long maxTimestamp,
                             long shallowOffsetOfMaxTimestamp) {
+            this.output = output;
             this.messagesRead = messagesRead;
             this.bytesRead = bytesRead;
             this.messagesRetained = messagesRetained;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index e055aa5..49d70c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -49,7 +49,7 @@ public class MemoryRecordsBuilder {
     // so it's not safe to hold a direct reference to the underlying ByteBuffer.
     private final ByteBufferOutputStream bufferStream;
     private final byte magic;
-    private final int initPos;
+    private final int initialPosition;
     private final long baseOffset;
     private final long logAppendTime;
     private final boolean isTransactional;
@@ -75,25 +75,7 @@ public class MemoryRecordsBuilder {
     private MemoryRecords builtRecords;
     private boolean aborted = false;
 
-    /**
-     * Construct a new builder.
-     *
-     * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
-     *               to fit the records appended)
-     * @param magic The magic value to use
-     * @param compressionType The compression codec to use
-     * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
-     * @param baseOffset The initial offset to use for
-     * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
-     * @param producerId The producer ID associated with the producer writing this record set
-     * @param producerEpoch The epoch of the producer
-     * @param baseSequence The sequence number of the first record in this set
-     * @param isTransactional Whether or not the records are part of a transaction
-     * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
-     *                   when compression is used since size estimates are rough, and in the case that the first
-     *                   record added exceeds the size).
-     */
-    public MemoryRecordsBuilder(ByteBuffer buffer,
+    public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
                                 byte magic,
                                 CompressionType compressionType,
                                 TimestampType timestampType,
@@ -120,7 +102,6 @@ public class MemoryRecordsBuilder {
         this.compressionType = compressionType;
         this.baseOffset = baseOffset;
         this.logAppendTime = logAppendTime;
-        this.initPos = buffer.position();
         this.numRecords = 0;
         this.writtenUncompressed = 0;
         this.actualCompressionRatio = 1;
@@ -132,22 +113,62 @@ public class MemoryRecordsBuilder {
         this.isControlBatch = isControlBatch;
         this.partitionLeaderEpoch = partitionLeaderEpoch;
         this.writeLimit = writeLimit;
-        this.initialCapacity = buffer.capacity();
+
+        this.initialPosition = bufferStream.position();
+        this.initialCapacity = bufferStream.capacity();
 
         if (magic > RecordBatch.MAGIC_VALUE_V1) {
-            buffer.position(initPos + DefaultRecordBatch.RECORDS_OFFSET);
+            bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET);
         } else if (compressionType != CompressionType.NONE) {
             // for compressed records, leave space for the header and the shallow message metadata
             // and move the starting position to the value payload offset
-            buffer.position(initPos + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
+            bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
         }
 
         // create the stream
-        bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = new DataOutputStream(compressionType.wrapForOutput(bufferStream, magic,
+        this.bufferStream = bufferStream;
+        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic,
                 COMPRESSION_DEFAULT_BUFFER_SIZE));
     }
 
+    /**
+     * Construct a new builder.
+     *
+     * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
+     *               to fit the records appended)
+     * @param magic The magic value to use
+     * @param compressionType The compression codec to use
+     * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
+     * @param baseOffset The initial offset to use for
+     * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
+     * @param producerId The producer ID associated with the producer writing this record set
+     * @param producerEpoch The epoch of the producer
+     * @param baseSequence The sequence number of the first record in this set
+     * @param isTransactional Whether or not the records are part of a transaction
+     * @param isControlBatch Whether or not this is a control batch (e.g. for transaction markers)
+     * @param partitionLeaderEpoch The epoch of the partition leader appending the record set to the log
+     * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
+     *                   when compression is used since size estimates are rough, and in the case that the first
+     *                   record added exceeds the size).
+     */
+    public MemoryRecordsBuilder(ByteBuffer buffer,
+                                byte magic,
+                                CompressionType compressionType,
+                                TimestampType timestampType,
+                                long baseOffset,
+                                long logAppendTime,
+                                long producerId,
+                                short producerEpoch,
+                                int baseSequence,
+                                boolean isTransactional,
+                                boolean isControlBatch,
+                                int partitionLeaderEpoch,
+                                int writeLimit) {
+        this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime,
+                producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch,
+                writeLimit);
+    }
+
     public ByteBuffer buffer() {
         return bufferStream.buffer();
     }
@@ -168,6 +189,10 @@ public class MemoryRecordsBuilder {
         return isControlBatch;
     }
 
+    public boolean isTransactional() {
+        return isTransactional;
+    }
+
     /**
      * Close this builder and return the resulting buffer.
      * @return The built log buffer
@@ -249,7 +274,7 @@ public class MemoryRecordsBuilder {
 
     public void abort() {
         closeForRecordAppends();
-        buffer().position(initPos);
+        buffer().position(initialPosition);
         aborted = true;
     }
 
@@ -260,24 +285,12 @@ public class MemoryRecordsBuilder {
         if (builtRecords != null)
             return;
 
-        if (isTransactional && producerId == RecordBatch.NO_PRODUCER_ID)
-            throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID");
-
-        if (producerId != RecordBatch.NO_PRODUCER_ID) {
-            if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH)
-                throw new IllegalArgumentException("Invalid negative producer epoch");
-
-            if (baseSequence < 0 && !isControlBatch)
-                throw new IllegalArgumentException("Invalid negative sequence number used");
-
-            if (magic < RecordBatch.MAGIC_VALUE_V2)
-                throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic);
-        }
+        validateProducerState();
 
         closeForRecordAppends();
 
         if (numRecords == 0L) {
-            buffer().position(initPos);
+            buffer().position(initialPosition);
             builtRecords = MemoryRecords.EMPTY;
         } else {
             if (magic > RecordBatch.MAGIC_VALUE_V1)
@@ -287,11 +300,27 @@ public class MemoryRecordsBuilder {
 
             ByteBuffer buffer = buffer().duplicate();
             buffer.flip();
-            buffer.position(initPos);
+            buffer.position(initialPosition);
             builtRecords = MemoryRecords.readableRecords(buffer.slice());
         }
     }
 
+    private void validateProducerState() {
+        if (isTransactional && producerId == RecordBatch.NO_PRODUCER_ID)
+            throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID");
+
+        if (producerId != RecordBatch.NO_PRODUCER_ID) {
+            if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH)
+                throw new IllegalArgumentException("Invalid negative producer epoch");
+
+            if (baseSequence < 0 && !isControlBatch)
+                throw new IllegalArgumentException("Invalid negative sequence number used");
+
+            if (magic < RecordBatch.MAGIC_VALUE_V2)
+                throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic);
+        }
+    }
+
     /**
      * Write the header to the default batch.
      * @return the written compressed bytes.
@@ -300,8 +329,8 @@ public class MemoryRecordsBuilder {
         ensureOpenForRecordBatchWrite();
         ByteBuffer buffer = bufferStream.buffer();
         int pos = buffer.position();
-        buffer.position(initPos);
-        int size = pos - initPos;
+        buffer.position(initialPosition);
+        int size = pos - initialPosition;
         int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
         int offsetDelta = (int) (lastOffset - baseOffset);
 
@@ -331,9 +360,9 @@ public class MemoryRecordsBuilder {
         ensureOpenForRecordBatchWrite();
         ByteBuffer buffer = bufferStream.buffer();
         int pos = buffer.position();
-        buffer.position(initPos);
+        buffer.position(initialPosition);
 
-        int wrapperSize = pos - initPos - Records.LOG_OVERHEAD;
+        int wrapperSize = pos - initialPosition - Records.LOG_OVERHEAD;
         int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic);
         AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize);
 
@@ -544,7 +573,7 @@ public class MemoryRecordsBuilder {
      * @param record the record to add
      */
     public void append(Record record) {
-        appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value(), record.headers());
+        appendWithOffset(record.offset(), isControlBatch, record.timestamp(), record.key(), record.value(), record.headers());
     }
 
     /**
@@ -736,4 +765,5 @@ public class MemoryRecordsBuilder {
     public short producerEpoch() {
         return this.producerEpoch;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
index 2f0a96c..728b6eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
 /**
  * A mutable record batch is one that can be modified in place (without copying). This is used by the broker
  * to override certain fields in the batch before appending it to the log.
@@ -42,4 +44,11 @@ public interface MutableRecordBatch extends RecordBatch {
      * @param epoch The partition leader epoch to use
      */
     void setPartitionLeaderEpoch(int epoch);
+
+    /**
+     * Write this record batch into an output stream.
+     * @param outputStream The buffer to write the batch to
+     */
+    void writeTo(ByteBufferOutputStream outputStream);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
index 9480c6d..79d4d4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
@@ -20,16 +20,18 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
- * A byte buffer backed output outputStream
+ * A ByteBuffer-backed OutputStream
  */
 public class ByteBufferOutputStream extends OutputStream {
 
     private static final float REALLOCATION_FACTOR = 1.1f;
 
     private ByteBuffer buffer;
+    private int initialPosition;
 
     public ByteBufferOutputStream(ByteBuffer buffer) {
         this.buffer = buffer;
+        this.initialPosition = buffer.position();
     }
 
     public void write(int b) {
@@ -40,18 +42,55 @@ public class ByteBufferOutputStream extends OutputStream {
 
     public void write(byte[] bytes, int off, int len) {
         if (buffer.remaining() < len)
-            expandBuffer(buffer.capacity() + len);
+            expandBuffer(buffer.position() + len);
         buffer.put(bytes, off, len);
     }
 
+    public void write(ByteBuffer buffer) {
+        if (buffer.hasArray())
+            write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+        else {
+            int pos = buffer.position();
+            for (int i = pos, limit = buffer.remaining() + pos; i < limit; i++)
+                write(buffer.get(i));
+        }
+    }
+
     public ByteBuffer buffer() {
         return buffer;
     }
 
+    public int position() {
+        return buffer.position();
+    }
+
+    public int capacity() {
+        return buffer.capacity();
+    }
+
+    public int limit() {
+        return buffer.limit();
+    }
+
+    public void position(int position) {
+        if (position > buffer.limit())
+            expandBuffer(position);
+        buffer.position(position);
+    }
+
     private void expandBuffer(int size) {
         int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
         ByteBuffer temp = ByteBuffer.allocate(expandSize);
-        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+        if (buffer.hasArray()) {
+            temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+        } else {
+            int limit = buffer.position();
+            for (int i = 0; i < limit; i++)
+                temp.put(buffer.get(i));
+        }
+
+        // reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed
+        buffer.position(initialPosition);
         buffer = temp;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 9ca95e3..ce3c599 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -153,8 +153,7 @@ public class MockClient implements KafkaClient {
             short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion());
             AbstractRequest abstractRequest = request.requestBuilder().build(version);
             if (!futureResp.requestMatcher.matches(abstractRequest))
-                throw new IllegalStateException("Next in line response did not match expected request, request: "
-                        + abstractRequest);
+                throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
             ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                     request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody);
             responses.add(resp);
@@ -195,6 +194,18 @@ public class MockClient implements KafkaClient {
         respond(response, false);
     }
 
+    public void respond(RequestMatcher matcher, AbstractResponse response) {
+        ClientRequest nextRequest = requests.peek();
+        if (nextRequest == null)
+            throw new IllegalStateException("No current requests queued");
+
+        AbstractRequest request = nextRequest.requestBuilder().build();
+        if (!matcher.matches(request))
+            throw new IllegalStateException("Request matcher did not match next-in-line request " + request);
+
+        respond(response);
+    }
+
     public void respond(AbstractResponse response, boolean disconnected) {
         ClientRequest request = requests.remove();
         short version = request.requestBuilder().desiredOrLatestVersion();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index f48ab33..e079f2a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -32,8 +32,11 @@ import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -563,7 +566,10 @@ public class RecordAccumulatorTest {
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time,
                                                         new ApiVersions(), null);
         // Create a big batch
-        ProducerBatch batch = ProducerBatch.createBatchOffAccumulator(tp1, CompressionType.NONE, 4096, now);
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        ProducerBatch batch = new ProducerBatch(tp1, builder, now, true);
+
         byte[] value = new byte[1024];
         final AtomicInteger acked = new AtomicInteger(0);
         Callback cb = new Callback() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c08ea57..77b1da8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -38,14 +38,14 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
-import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.MockTime;
@@ -61,6 +61,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -554,28 +555,18 @@ public class SenderTest {
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
         Metrics m = new Metrics();
         TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0);
-        txnManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(123456L, (short) 0));
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
         accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
                                             new ApiVersions(), txnManager);
         try {
-            Sender sender = new Sender(client,
-                                       metadata,
-                                       this.accumulator,
-                                       true,
-                                       MAX_REQUEST_SIZE,
-                                       ACKS_ALL,
-                                       maxRetries,
-                                       m,
-                                       time,
-                                       REQUEST_TIMEOUT,
-                                       1000L,
-                                       txnManager,
-                                       new ApiVersions());
+            Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+                    m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
             // Send the first message.
-            TopicPartition tp2 = new TopicPartition(topic, 1);
+            final TopicPartition tp2 = new TopicPartition(topic, 1);
             Future<RecordMetadata> f1 =
                     accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
             Future<RecordMetadata> f2 =
@@ -607,7 +598,8 @@ public class SenderTest {
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
             responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
-            client.respond(new ProduceResponse(responseMap));
+            client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 0, false), new ProduceResponse(responseMap));
+
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f1.isDone());
             assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue());
@@ -621,7 +613,8 @@ public class SenderTest {
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 
             responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
-            client.respond(new ProduceResponse(responseMap));
+            client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 1, false), new ProduceResponse(responseMap));
+
             sender.run(time.milliseconds()); // receive
             assertTrue("The future should have been done.", f2.isDone());
             assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue());
@@ -635,6 +628,36 @@ public class SenderTest {
         }
     }
 
+    private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition tp,
+                                                            final ProducerIdAndEpoch producerIdAndEpoch,
+                                                            final int sequence,
+                                                            final boolean isTransactional) {
+        return new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                if (!(body instanceof ProduceRequest))
+                    return false;
+
+                ProduceRequest request = (ProduceRequest) body;
+                Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail();
+                MemoryRecords records = recordsMap.get(tp);
+                if (records == null)
+                    return false;
+
+                List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+                if (batches.isEmpty() || batches.size() > 1)
+                    return false;
+
+                MutableRecordBatch batch = batches.get(0);
+                return batch.baseOffset() == 0L &&
+                        batch.baseSequence() == sequence &&
+                        batch.producerId() == producerIdAndEpoch.producerId &&
+                        batch.producerEpoch() == producerIdAndEpoch.epoch &&
+                        batch.isTransactional() == isTransactional;
+            }
+        };
+    }
+
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
         assertTrue("Request should be completed", future.isDone());
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 65de01c..7c37354 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -37,7 +37,7 @@ public class FileLogInputStreamTest {
                     new SimpleRecord("bar".getBytes())));
             fileRecords.flush();
 
-            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), Integer.MAX_VALUE, 0,
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
                     fileRecords.sizeInBytes());
 
             FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index afd0126..a7058a3 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.List;
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -202,7 +204,8 @@ public class MemoryRecordsTest {
             builder.append(12L, null, "c".getBytes());
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
-            builder.build().filterTo(new RetainNonNullKeysFilter(), filtered);
+            builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered,
+                    Integer.MAX_VALUE);
 
             filtered.flip();
             MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -278,7 +281,7 @@ public class MemoryRecordsTest {
             buffer.flip();
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
-            MemoryRecords.readableRecords(buffer).filterTo(new MemoryRecords.RecordFilter() {
+            MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
                 @Override
                 protected boolean shouldDiscard(RecordBatch batch) {
                     // discard the second and fourth batches
@@ -289,7 +292,7 @@ public class MemoryRecordsTest {
                 protected boolean shouldRetain(RecordBatch recordBatch, Record record) {
                     return true;
                 }
-            }, filtered);
+            }, filtered, Integer.MAX_VALUE);
 
             filtered.flip();
             MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -316,7 +319,8 @@ public class MemoryRecordsTest {
         buffer.flip();
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
-        MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+        MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
+                filtered, Integer.MAX_VALUE);
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
@@ -383,7 +387,8 @@ public class MemoryRecordsTest {
             buffer.flip();
 
             ByteBuffer filtered = ByteBuffer.allocate(2048);
-            MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+            MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
+                    filtered, Integer.MAX_VALUE);
 
             filtered.flip();
             MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -440,6 +445,55 @@ public class MemoryRecordsTest {
     }
 
     @Test
+    public void testFilterToWithUndersizedBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "1".getBytes(), new byte[128]);
+        builder.append(12L, "2".getBytes(), "c".getBytes());
+        builder.append(13L, null, "d".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 4L);
+        builder.append(14L, null, "e".getBytes());
+        builder.append(15L, "5".getBytes(), "f".getBytes());
+        builder.append(16L, "6".getBytes(), "g".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 7L);
+        builder.append(17L, "7".getBytes(), new byte[128]);
+        builder.close();
+
+        buffer.flip();
+
+        ByteBuffer output = ByteBuffer.allocate(64);
+
+        List<Record> records = new ArrayList<>();
+        while (buffer.hasRemaining()) {
+            output.rewind();
+
+            MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer)
+                    .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE);
+
+            buffer.position(buffer.position() + result.bytesRead);
+            result.output.flip();
+
+            if (output != result.output)
+                assertEquals(0, output.position());
+
+            MemoryRecords filtered = MemoryRecords.readableRecords(result.output);
+            records.addAll(TestUtils.toList(filtered.records()));
+        }
+
+        assertEquals(5, records.size());
+        for (Record record : records)
+            assertNotNull(record.key());
+    }
+
+    @Test
     public void testFilterTo() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
@@ -464,7 +518,8 @@ public class MemoryRecordsTest {
         buffer.flip();
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
-        MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+        MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(
+                new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE);
 
         filtered.flip();
 
@@ -576,7 +631,8 @@ public class MemoryRecordsTest {
         buffer.flip();
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
-        MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+        MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
+                filtered, Integer.MAX_VALUE);
 
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
new file mode 100644
index 0000000..2ef5672
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class ByteBufferOutputStreamTest {
+
+    @Test
+    public void testExpandByteBufferOnPositionIncrease() throws Exception {
+        testExpandByteBufferOnPositionIncrease(ByteBuffer.allocate(16));
+    }
+
+    @Test
+    public void testExpandDirectByteBufferOnPositionIncrease() throws Exception {
+        testExpandByteBufferOnPositionIncrease(ByteBuffer.allocateDirect(16));
+    }
+
+    private void testExpandByteBufferOnPositionIncrease(ByteBuffer initialBuffer) throws Exception {
+        ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
+        output.write("hello".getBytes());
+        output.position(32);
+        assertEquals(32, output.position());
+        assertEquals(0, initialBuffer.position());
+
+        ByteBuffer buffer = output.buffer();
+        assertEquals(32, buffer.limit());
+        buffer.position(0);
+        buffer.limit(5);
+        byte[] bytes = new byte[5];
+        buffer.get(bytes);
+        assertArrayEquals("hello".getBytes(), bytes);
+    }
+
+    @Test
+    public void testExpandByteBufferOnWrite() throws Exception {
+        testExpandByteBufferOnWrite(ByteBuffer.allocate(16));
+    }
+
+    @Test
+    public void testExpandDirectByteBufferOnWrite() throws Exception {
+        testExpandByteBufferOnWrite(ByteBuffer.allocateDirect(16));
+    }
+
+    private void testExpandByteBufferOnWrite(ByteBuffer initialBuffer) throws Exception {
+        ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
+        output.write("hello".getBytes());
+        output.write(new byte[27]);
+        assertEquals(32, output.position());
+        assertEquals(0, initialBuffer.position());
+
+        ByteBuffer buffer = output.buffer();
+        assertEquals(32, buffer.limit());
+        buffer.position(0);
+        buffer.limit(5);
+        byte[] bytes = new byte[5];
+        buffer.get(bytes);
+        assertArrayEquals("hello".getBytes(), bytes);
+    }
+
+    @Test
+    public void testWriteByteBuffer() {
+        testWriteByteBuffer(ByteBuffer.allocate(16));
+    }
+
+    @Test
+    public void testWriteDirectByteBuffer() {
+        testWriteByteBuffer(ByteBuffer.allocateDirect(16));
+    }
+
+    private void testWriteByteBuffer(ByteBuffer input) {
+        long value = 234239230L;
+        input.putLong(value);
+        input.flip();
+
+        ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32));
+        output.write(input);
+        assertEquals(8, output.position());
+        assertEquals(value, output.buffer().getLong(0));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 67b9271..c37ea08 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -318,7 +318,7 @@ class Log(@volatile var dir: File,
         loadProducersFromLog(stateManager, fetchDataInfo.records)
     }
     stateManager.updateMapEndOffset(segment.baseOffset)
-    val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache)
+    val bytesTruncated = segment.recover(stateManager, leaderEpochCache)
 
     // once we have recovered the segment's data, take a snapshot to ensure that we won't
     // need to reload the same segment again while recovering another segment.

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8eda2e1..b05e37f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -516,22 +516,23 @@ private[log] class Cleaner(val id: Int,
       source.log.readInto(readBuffer, position)
       val records = MemoryRecords.readableRecords(readBuffer)
       throttler.maybeThrottle(records.sizeInBytes)
-      val result = records.filterTo(logCleanerFilter, writeBuffer)
+      val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize)
       stats.readMessages(result.messagesRead, result.bytesRead)
       stats.recopyMessages(result.messagesRetained, result.bytesRetained)
 
       position += result.bytesRead
 
       // if any messages are to be retained, write them out
-      if (writeBuffer.position > 0) {
-        writeBuffer.flip()
-        val retained = MemoryRecords.readableRecords(writeBuffer)
+      val outputBuffer = result.output
+      if (outputBuffer.position > 0) {
+        outputBuffer.flip()
+        val retained = MemoryRecords.readableRecords(outputBuffer)
         dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
           largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
           records = retained)
-        throttler.maybeThrottle(writeBuffer.limit)
+        throttler.maybeThrottle(outputBuffer.limit)
       }
 
       // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 70269bb..3e4c47d 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -242,19 +242,16 @@ class LogSegment(val log: FileRecords,
      index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
 
   /**
-   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
+   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
+   * from the end of the log and index.
    *
-   * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
-   * is corrupt.
    * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
    *                             the transaction index.
    * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
    * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
-  def recover(maxMessageSize: Int,
-              producerStateManager: ProducerStateManager,
-              leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
+  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
     index.truncate()
     index.resize(index.maxIndexSize)
     timeIndex.truncate()
@@ -264,7 +261,7 @@ class LogSegment(val log: FileRecords,
     var lastIndexEntry = 0
     maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
     try {
-      for (batch <- log.batches(maxMessageSize).asScala) {
+      for (batch <- log.batches.asScala) {
         batch.ensureValid()
 
         // The max timestamp is exposed at the batch level, so no need to iterate the records
@@ -296,6 +293,9 @@ class LogSegment(val log: FileRecords,
           .format(log.file.getAbsolutePath, validBytes, e.getMessage))
     }
     val truncated = log.sizeInBytes - validBytes
+    if (truncated > 0)
+      logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
+
     log.truncateTo(validBytes)
     index.trimToValidSize()
     // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f0c41c7..4de546f 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -337,8 +337,8 @@ object DumpLogSegments {
     val messageSet = FileRecords.open(file, false)
     var validBytes = 0L
     var lastOffset = -1L
-    val batches = messageSet.batches(maxMessageSize).asScala
-    for (batch <- batches) {
+
+    for (batch <- messageSet.batches.asScala) {
       if (isDeepIteration) {
         for (record <- batch.asScala) {
           if (lastOffset == -1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 5db1ed6..79fe220 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -264,7 +264,7 @@ class LogSegmentTest {
       seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
     val indexFile = seg.index.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
-    seg.recover(64*1024,   new ProducerStateManager(topicPartition, logDir))
+    seg.recover(new ProducerStateManager(topicPartition, logDir))
     for(i <- 0 until 100)
       assertEquals(i, seg.read(i, Some(i + 1), 1024).records.records.iterator.next().offset)
   }
@@ -303,7 +303,7 @@ class LogSegmentTest {
       shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
 
     var stateManager = new ProducerStateManager(topicPartition, logDir)
-    segment.recover(64 * 1024, stateManager)
+    segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 
 
@@ -318,7 +318,7 @@ class LogSegmentTest {
     // recover again, but this time assuming the transaction from pid2 began on a previous segment
     stateManager = new ProducerStateManager(topicPartition, logDir)
     stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
-    segment.recover(64 * 1024, stateManager)
+    segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 
     abortedTxns = segment.txnIndex.allAbortedTxns
@@ -352,7 +352,7 @@ class LogSegmentTest {
       seg.append(i, i, i * 10, i, records(i, i.toString))
     val timeIndexFile = seg.timeIndex.file
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
-    seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
+    seg.recover(new ProducerStateManager(topicPartition, logDir))
     for(i <- 0 until 100) {
       assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset)
       if (i < 99)
@@ -376,7 +376,7 @@ class LogSegmentTest {
       val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
       val position = recordPosition.position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
-      seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
+      seg.recover(new ProducerStateManager(topicPartition, logDir))
       assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList,
         seg.log.batches.asScala.map(_.lastOffset).toList)
       seg.delete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 84ff43b..52e9140 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -30,6 +30,7 @@ import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.{BrokerTopicStats, KafkaConfig}
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
@@ -221,9 +222,9 @@ class LogTest {
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
-    records.filterTo(new RecordFilter {
+    records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
       override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
-    }, filtered)
+    }, filtered, Int.MaxValue)
     filtered.flip()
     val filteredRecords = MemoryRecords.readableRecords(filtered)
 
@@ -265,9 +266,9 @@ class LogTest {
     records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
 
     val filtered = ByteBuffer.allocate(2048)
-    records.filterTo(new RecordFilter {
+    records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
       override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
-    }, filtered)
+    }, filtered, Int.MaxValue)
     filtered.flip()
     val filteredRecords = MemoryRecords.readableRecords(filtered)