You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/01 19:05:39 UTC

kafka git commit: KAFKA-5150; reduce lz4 decompression overhead

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 386a8d041 -> e944956a3


KAFKA-5150; reduce lz4 decompression overhead

- reuse decompression buffers in consumer Fetcher
- switch lz4 input stream to operate directly on ByteBuffers
- avoids performance impact of catching exceptions when reaching the end of legacy record batches
- more tests with both compressible / incompressible data, multiple
  blocks, and various other combinations to increase code coverage
- fixes bug that would cause exception instead of invalid block size
  for invalid incompressible blocks
- fixes bug if incompressible flag is set on end frame block size

Author: Xavier Léauté <xa...@confluent.io>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3090 from xvrl/kafka-5150-0.10


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e944956a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e944956a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e944956a

Branch: refs/heads/0.10.2
Commit: e944956a35d30ba4e393be887f873f12106efe75
Parents: 386a8d0
Author: Xavier Léauté <xa...@confluent.io>
Authored: Thu Jun 1 20:05:04 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jun 1 20:05:11 2017 +0100

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |   4 +-
 .../kafka/common/record/AbstractRecords.java    |   4 +-
 .../kafka/common/record/BufferSupplier.java     |  95 ++++++
 .../apache/kafka/common/record/FileRecords.java |  23 +-
 .../common/record/KafkaLZ4BlockInputStream.java | 205 +++++++------
 .../apache/kafka/common/record/LogEntry.java    |   2 +-
 .../kafka/common/record/MemoryRecords.java      |  19 +-
 .../common/record/MemoryRecordsBuilder.java     |  22 +-
 .../org/apache/kafka/common/record/Records.java |  18 +-
 .../kafka/common/record/RecordsIterator.java    |  51 ++--
 .../org/apache/kafka/common/utils/Utils.java    |  24 ++
 .../internals/RecordAccumulatorTest.java        |   7 +-
 .../kafka/common/record/FileRecordsTest.java    |   2 +-
 .../kafka/common/record/KafkaLZ4Test.java       | 286 ++++++++++++++++---
 .../kafka/common/record/MemoryRecordsTest.java  |   6 +-
 .../src/main/scala/kafka/log/LogValidator.scala |   2 +-
 .../kafka/message/CompressionFactory.scala      |  19 +-
 .../kafka/message/MessageCompressionTest.scala  |   7 +-
 .../unit/kafka/message/MessageWriterTest.scala  |   2 +-
 19 files changed, 593 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e2631b5..a3d4a90 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.BufferSupplier;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
@@ -93,6 +94,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
+    private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
 
     private PartitionRecords<K, V> nextInLineRecords = null;
     private ExceptionMetadata nextInLineExceptionMetadata = null;
@@ -782,7 +784,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
 
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                 boolean skippedRecords = false;
-                for (LogEntry logEntry : partition.records.deepEntries()) {
+                for (LogEntry logEntry : partition.records.deepEntries(decompressionBufferSupplier)) {
                     // Skip the messages earlier than current position.
                     if (logEntry.offset() >= position) {
                         parsed.add(parseRecord(tp, logEntry));

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 3a96d88..ebfc7ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -26,7 +26,7 @@ public abstract class AbstractRecords implements Records {
         @Override
         public Iterator<Record> iterator() {
             return new Iterator<Record>() {
-                private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator();
+                private final Iterator<? extends LogEntry> deepEntries = deepEntries(BufferSupplier.NO_CACHING).iterator();
                 @Override
                 public boolean hasNext() {
                     return deepEntries.hasNext();
@@ -57,7 +57,7 @@ public abstract class AbstractRecords implements Records {
     @Override
     public Records toMessageFormat(byte toMagic) {
         List<LogEntry> converted = new ArrayList<>();
-        for (LogEntry entry : deepEntries())
+        for (LogEntry entry : deepEntries(BufferSupplier.NO_CACHING))
             converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
 
         if (converted.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
new file mode 100644
index 0000000..df7ba0b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
+ * a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record
+ * batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and
+ * iterating over the records in the batch.
+ */
+public abstract class BufferSupplier implements AutoCloseable {
+
+    public static final BufferSupplier NO_CACHING = new BufferSupplier() {
+        @Override
+        public ByteBuffer get(int capacity) {
+            return ByteBuffer.allocate(capacity);
+        }
+
+        @Override
+        public void release(ByteBuffer buffer) {}
+
+        @Override
+        public void close() {}
+    };
+
+    public static BufferSupplier create() {
+        return new DefaultSupplier();
+    }
+
+    /**
+     * Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance.
+     */
+    public abstract ByteBuffer get(int capacity);
+
+    /**
+     * Return the provided buffer to be reused by a subsequent call to `get`.
+     */
+    public abstract void release(ByteBuffer buffer);
+
+    /**
+     * Release all resources associated with this supplier.
+     */
+    public abstract void close();
+
+    private static class DefaultSupplier extends BufferSupplier {
+        // We currently use a single block size, so optimise for that case
+        private final Map<Integer, Deque<ByteBuffer>> bufferMap = new HashMap<>(1);
+
+        @Override
+        public ByteBuffer get(int size) {
+            Deque<ByteBuffer> bufferQueue = bufferMap.get(size);
+            if (bufferQueue == null || bufferQueue.isEmpty())
+                return ByteBuffer.allocate(size);
+            else
+                return bufferQueue.pollFirst();
+        }
+
+        @Override
+        public void release(ByteBuffer buffer) {
+            buffer.clear();
+            Deque<ByteBuffer> bufferQueue = bufferMap.get(buffer.capacity());
+            if (bufferQueue == null) {
+                // We currently keep a single buffer in flight, so optimise for that case
+                bufferQueue = new ArrayDeque<>(1);
+                bufferMap.put(buffer.capacity(), bufferQueue);
+            }
+            bufferQueue.addLast(buffer);
+        }
+
+        @Override
+        public void close() {
+            bufferMap.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 960b716..fd8df3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -43,13 +43,6 @@ public class FileRecords extends AbstractRecords implements Closeable {
 
     private final Iterable<FileChannelLogEntry> shallowEntries;
 
-    private final Iterable<LogEntry> deepEntries = new Iterable<LogEntry>() {
-        @Override
-        public Iterator<LogEntry> iterator() {
-            return deepIterator();
-        }
-    };
-
     // mutable state
     private final AtomicInteger size;
     private final FileChannel channel;
@@ -362,18 +355,28 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     @Override
+    public Iterable<LogEntry> deepEntries(final BufferSupplier bufferSupplier) {
+        return new Iterable<LogEntry>() {
+            @Override
+            public Iterator<LogEntry> iterator() {
+                return deepIterator(bufferSupplier);
+            }
+        };
+    }
+
+    @Override
     public Iterable<LogEntry> deepEntries() {
-        return deepEntries;
+        return deepEntries(BufferSupplier.NO_CACHING);
     }
 
-    private Iterator<LogEntry> deepIterator() {
+    private Iterator<LogEntry> deepIterator(BufferSupplier bufferSupplier) {
         final int end;
         if (isSlice)
             end = this.end;
         else
             end = this.sizeInBytes();
         FileLogInputStream inputStream = new FileLogInputStream(channel, Integer.MAX_VALUE, start, end);
-        return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE);
+        return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE, bufferSupplier);
     }
 
     public static FileRecords open(File file,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index a408580..3d9d86b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -14,83 +14,78 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.record;
 
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.Utils;
-
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4SafeDecompressor;
 import net.jpountz.xxhash.XXHash32;
 import net.jpountz.xxhash.XXHashFactory;
 
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
 /**
  * A partial implementation of the v1.5.1 LZ4 Frame format.
  *
- * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
+ * @see <a href="https://github.com/lz4/lz4/wiki/lz4_Frame_format.md">LZ4 Frame Format</a>
+ *
+ * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+public final class KafkaLZ4BlockInputStream extends InputStream {
 
     public static final String PREMATURE_EOS = "Stream ended prematurely";
     public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
     public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
     public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
 
-    private final LZ4SafeDecompressor decompressor;
-    private final XXHash32 checksum;
-    private final byte[] buffer;
-    private final byte[] compressedBuffer;
-    private final int maxBlockSize;
+    private static final LZ4SafeDecompressor DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor();
+    private static final XXHash32 CHECKSUM = XXHashFactory.fastestInstance().hash32();
+
+    private final ByteBuffer in;
     private final boolean ignoreFlagDescriptorChecksum;
+    private final BufferSupplier bufferSupplier;
+    private final ByteBuffer decompressionBuffer;
+    // `flg` and `maxBlockSize` are effectively final, they are initialised in the `readHeader` method that is only
+    // invoked from the constructor
     private FLG flg;
-    private BD bd;
-    private int bufferOffset;
-    private int bufferSize;
+    private int maxBlockSize;
+
+    // If a block is compressed, this is the same as `decompressionBuffer`. If a block is not compressed, this is
+    // a slice of `in` to avoid unnecessary copies.
+    private ByteBuffer decompressedBuffer;
     private boolean finished;
 
     /**
      * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
      *
-     * @param in The stream to decompress
+     * @param in The byte buffer to decompress
      * @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
      * @throws IOException
      */
-    public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
-        super(in);
-        decompressor = LZ4Factory.fastestInstance().safeDecompressor();
-        checksum = XXHashFactory.fastestInstance().hash32();
+    public KafkaLZ4BlockInputStream(ByteBuffer in, BufferSupplier bufferSupplier, boolean ignoreFlagDescriptorChecksum) throws IOException {
         this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+        this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN);
+        this.bufferSupplier = bufferSupplier;
         readHeader();
-        maxBlockSize = bd.getBlockMaximumSize();
-        buffer = new byte[maxBlockSize];
-        compressedBuffer = new byte[maxBlockSize];
-        bufferOffset = 0;
-        bufferSize = 0;
+        decompressionBuffer = bufferSupplier.get(maxBlockSize);
+        if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
+            // require array backed decompression buffer with zero offset
+            // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
+            throw new RuntimeException("decompression buffer must have backing array with zero array offset");
+        }
         finished = false;
     }
 
     /**
-     * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
-     *
-     * @param in The stream to decompress
-     * @throws IOException
-     */
-    public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
-        this(in, false);
-    }
-
-    /**
      * Check whether KafkaLZ4BlockInputStream is configured to ignore the
      * Frame Descriptor checksum, which is useful for compatibility with
      * old client implementations that use incorrect checksum calculations.
@@ -100,43 +95,50 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
     }
 
     /**
-     * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+     * Reads the magic number and frame descriptor from input buffer.
      *
      * @throws IOException
      */
     private void readHeader() throws IOException {
-        byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
-
         // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
-        int headerOffset = 6;
-        if (in.read(header, 0, headerOffset) != headerOffset) {
+        if (in.remaining() < 6) {
             throw new IOException(PREMATURE_EOS);
         }
 
-        if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) {
+        if (MAGIC != in.getInt()) {
             throw new IOException(NOT_SUPPORTED);
         }
-        flg = FLG.fromByte(header[headerOffset - 2]);
-        bd = BD.fromByte(header[headerOffset - 1]);
+        // mark start of data to checksum
+        in.mark();
+
+        flg = FLG.fromByte(in.get());
+        maxBlockSize = BD.fromByte(in.get()).getBlockMaximumSize();
 
         if (flg.isContentSizeSet()) {
-            if (in.read(header, headerOffset, 8) != 8)
+            if (in.remaining() < 8) {
                 throw new IOException(PREMATURE_EOS);
-            headerOffset += 8;
+            }
+            in.position(in.position() + 8);
         }
 
         // Final byte of Frame Descriptor is HC checksum
-        header[headerOffset++] = (byte) in.read();
 
         // Old implementations produced incorrect HC checksums
-        if (ignoreFlagDescriptorChecksum)
+        if (ignoreFlagDescriptorChecksum) {
+            in.position(in.position() + 1);
             return;
+        }
+
+        int len = in.position() - in.reset().position();
 
-        int offset = 4;
-        int len = headerOffset - offset - 1; // dont include magic bytes or HC
-        byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
-        if (hash != header[headerOffset - 1])
+        int hash = in.hasArray() ?
+                       // workaround for https://github.com/lz4/lz4-java/pull/65
+                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
+                       CHECKSUM.hash(in, in.position(), len, 0);
+        in.position(in.position() + len);
+        if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
             throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+        }
     }
 
     /**
@@ -146,46 +148,70 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
      * @throws IOException
      */
     private void readBlock() throws IOException {
-        int blockSize = Utils.readUnsignedIntLE(in);
+        if (in.remaining() < 4) {
+            throw new IOException(PREMATURE_EOS);
+        }
+
+        int blockSize = in.getInt();
+        boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+        blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
 
         // Check for EndMark
         if (blockSize == 0) {
             finished = true;
             if (flg.isContentChecksumSet())
-                Utils.readUnsignedIntLE(in); // TODO: verify this content checksum
+                in.getInt(); // TODO: verify this content checksum
             return;
         } else if (blockSize > maxBlockSize) {
             throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
         }
 
-        boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
-        byte[] bufferToRead;
-        if (compressed) {
-            bufferToRead = compressedBuffer;
-        } else {
-            blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
-            bufferToRead = buffer;
-            bufferSize = blockSize;
-        }
-
-        if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+        if (in.remaining() < blockSize) {
             throw new IOException(PREMATURE_EOS);
         }
 
-        // verify checksum
-        if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
-            throw new IOException(BLOCK_HASH_MISMATCH);
-        }
-
         if (compressed) {
             try {
-                bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+                // workaround for https://github.com/lz4/lz4-java/pull/65
+                final int bufferSize;
+                if (in.hasArray()) {
+                    bufferSize = DECOMPRESSOR.decompress(
+                        in.array(),
+                        in.position() + in.arrayOffset(),
+                        blockSize,
+                        decompressionBuffer.array(),
+                        0,
+                        maxBlockSize
+                    );
+                } else {
+                    // decompressionBuffer has zero arrayOffset, so we don't need to worry about
+                    // https://github.com/lz4/lz4-java/pull/65
+                    bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
+                }
+                decompressionBuffer.position(0);
+                decompressionBuffer.limit(bufferSize);
+                decompressedBuffer = decompressionBuffer;
             } catch (LZ4Exception e) {
                 throw new IOException(e);
             }
+        } else {
+            decompressedBuffer = in.slice();
+            decompressedBuffer.limit(blockSize);
         }
 
-        bufferOffset = 0;
+        // verify checksum
+        if (flg.isBlockChecksumSet()) {
+            // workaround for https://github.com/lz4/lz4-java/pull/65
+            int hash = in.hasArray() ?
+                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
+                       CHECKSUM.hash(in, in.position(), blockSize, 0);
+            in.position(in.position() + blockSize);
+            if (hash != in.getInt()) {
+                throw new IOException(BLOCK_HASH_MISMATCH);
+            }
+        } else {
+            in.position(in.position() + blockSize);
+        }
     }
 
     @Override
@@ -200,7 +226,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
             return -1;
         }
 
-        return buffer[bufferOffset++] & 0xFF;
+        return decompressedBuffer.get() & 0xFF;
     }
 
     @Override
@@ -216,8 +242,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
             return -1;
         }
         len = Math.min(len, available());
-        System.arraycopy(buffer, bufferOffset, b, off, len);
-        bufferOffset += len;
+
+        decompressedBuffer.get(b, off, len);
         return len;
     }
 
@@ -232,28 +258,28 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
         if (finished) {
             return 0;
         }
-        n = Math.min(n, available());
-        bufferOffset += n;
-        return n;
+        int skipped = (int) Math.min(n, available());
+        decompressedBuffer.position(decompressedBuffer.position() + skipped);
+        return skipped;
     }
 
     @Override
     public int available() throws IOException {
-        return bufferSize - bufferOffset;
+        return decompressedBuffer == null ? 0 : decompressedBuffer.remaining();
     }
 
     @Override
     public void close() throws IOException {
-        in.close();
+        bufferSupplier.release(decompressionBuffer);
     }
 
     @Override
-    public synchronized void mark(int readlimit) {
+    public void mark(int readlimit) {
         throw new RuntimeException("mark not supported");
     }
 
     @Override
-    public synchronized void reset() throws IOException {
+    public void reset() throws IOException {
         throw new RuntimeException("reset not supported");
     }
 
@@ -261,5 +287,4 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
     public boolean markSupported() {
         return false;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
index d2db356..51c9ebb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
@@ -107,7 +107,7 @@ public abstract class LogEntry implements Iterable<LogEntry> {
     @Override
     public Iterator<LogEntry> iterator() {
         if (isCompressed())
-            return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+            return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
         return Collections.singletonList(this).iterator();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index f1a6e43..6cc70be 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -43,8 +43,6 @@ public class MemoryRecords extends AbstractRecords {
         }
     };
 
-    private final Iterable<LogEntry> deepEntries = deepEntries(false);
-
     private int validBytes = -1;
 
     // Construct a writable memory records
@@ -231,27 +229,32 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
+    public Iterable<LogEntry> deepEntries(BufferSupplier bufferSupplier) {
+        return deepEntries(false, bufferSupplier);
+    }
+
+    @Override
     public Iterable<LogEntry> deepEntries() {
-        return deepEntries;
+        return deepEntries(false, BufferSupplier.NO_CACHING);
     }
 
-    public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic) {
+    public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic, final BufferSupplier bufferSupplier) {
         return new Iterable<LogEntry>() {
             @Override
             public Iterator<LogEntry> iterator() {
-                return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
+                return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE, bufferSupplier);
             }
         };
     }
 
-    private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
+    private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize, BufferSupplier bufferSupplier) {
         return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
-                ensureMatchingMagic, maxMessageSize);
+                ensureMatchingMagic, maxMessageSize, bufferSupplier);
     }
 
     @Override
     public String toString() {
-        Iterator<LogEntry> iter = deepEntries().iterator();
+        Iterator<LogEntry> iter = deepEntries(BufferSupplier.NO_CACHING).iterator();
         StringBuilder builder = new StringBuilder();
         builder.append('[');
         while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a46c1c6..7775140 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -81,7 +80,7 @@ public class MemoryRecordsBuilder {
         @Override
         public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
             return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
-                .getConstructor(InputStream.class, Boolean.TYPE);
+                .getConstructor(ByteBuffer.class, BufferSupplier.class, Boolean.TYPE);
         }
     });
 
@@ -408,7 +407,7 @@ public class MemoryRecordsBuilder {
         return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten();
     }
 
-    private static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) {
+    protected static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) {
         try {
             switch (type) {
                 case NONE:
@@ -438,25 +437,26 @@ public class MemoryRecordsBuilder {
         }
     }
 
-    public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
+    public static InputStream wrapForInput(ByteBuffer buffer, CompressionType type, byte messageVersion, BufferSupplier bufferSupplier) {
         try {
             switch (type) {
                 case NONE:
-                    return buffer;
+                    return new ByteBufferInputStream(buffer);
                 case GZIP:
-                    return new DataInputStream(new GZIPInputStream(buffer));
+                    return new GZIPInputStream(new ByteBufferInputStream(buffer));
                 case SNAPPY:
                     try {
-                        InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
-                        return new DataInputStream(stream);
+                        return (InputStream) snappyInputStreamSupplier.get().newInstance(new ByteBufferInputStream(buffer));
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
                 case LZ4:
                     try {
-                        InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
-                                messageVersion == Record.MAGIC_VALUE_V0);
-                        return new DataInputStream(stream);
+                        return (InputStream) lz4InputStreamSupplier.get().newInstance(
+                            buffer,
+                            bufferSupplier,
+                            messageVersion == Record.MAGIC_VALUE_V0
+                        );
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 9235f92..2a3e506 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -24,7 +24,7 @@ import java.nio.channels.GatheringByteChannel;
  * Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
  * If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
  * compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
- * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries()}. Note
+ * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries(BufferSupplier)}. Note
  * that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
  * shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
  * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
@@ -67,6 +67,22 @@ public interface Records {
      * there are fewer options for optimization since the data must be decompressed before it can be
      * returned. Hence there is little advantage in allowing subclasses to return a more specific type
      * as we do for {@link #shallowEntries()}.
+     *
+     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported.
+     *                                    For small record batches, allocating a potentially large buffer (64 KB for LZ4)
+     *                                    will dominate the cost of decompressing and iterating over the records in the
+     *                                    batch. As such, a supplier that reuses buffers will have a significant
+     *                                    performance impact.
+     * @return An iterator over the deep entries of the log
+     */
+    Iterable<LogEntry> deepEntries(BufferSupplier decompressionBufferSupplier);
+
+    /**
+     * Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
+     * there are fewer options for optimization since the data must be decompressed before it can be
+     * returned. Hence there is little advantage in allowing subclasses to return a more specific type
+     * as we do for {@link #shallowEntries()}.
+     *
      * @return An iterator over the deep entries of the log
      */
     Iterable<LogEntry> deepEntries();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index 792a857..3150be3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -21,9 +21,8 @@ import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Utils;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Iterator;
@@ -36,16 +35,19 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
     private final boolean ensureMatchingMagic;
     private final int maxRecordSize;
     private final ShallowRecordsIterator<?> shallowIter;
+    private final BufferSupplier bufferSupplier;
     private DeepRecordsIterator innerIter;
 
     public RecordsIterator(LogInputStream<?> logInputStream,
                            boolean shallow,
                            boolean ensureMatchingMagic,
-                           int maxRecordSize) {
+                           int maxRecordSize,
+                           BufferSupplier bufferSupplier) {
         this.shallowIter = new ShallowRecordsIterator<>(logInputStream);
         this.shallow = shallow;
         this.ensureMatchingMagic = ensureMatchingMagic;
         this.maxRecordSize = maxRecordSize;
+        this.bufferSupplier = bufferSupplier;
     }
 
     /**
@@ -76,7 +78,7 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
                 // would not try to further decompress underlying messages
                 // There will be at least one element in the inner iterator, so we don't
                 // need to call hasNext() here.
-                innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize);
+                innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize, bufferSupplier);
                 return innerIter.next();
             }
         } else {
@@ -89,30 +91,35 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
     }
 
     private static final class DataLogInputStream implements LogInputStream<LogEntry> {
-        private final DataInputStream stream;
+
+        private final InputStream stream;
         protected final int maxMessageSize;
+        private final ByteBuffer offsetAndSizeBuffer;
 
-        DataLogInputStream(DataInputStream stream, int maxMessageSize) {
+        DataLogInputStream(InputStream stream, int maxMessageSize) {
             this.stream = stream;
             this.maxMessageSize = maxMessageSize;
+            this.offsetAndSizeBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
         }
 
         public LogEntry nextEntry() throws IOException {
-            try {
-                long offset = stream.readLong();
-                int size = stream.readInt();
-                if (size < Record.RECORD_OVERHEAD_V0)
-                    throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
-                if (size > maxMessageSize)
-                    throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
-
-                byte[] recordBuffer = new byte[size];
-                stream.readFully(recordBuffer, 0, size);
-                ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
-                return LogEntry.create(offset, new Record(buf));
-            } catch (EOFException e) {
+            offsetAndSizeBuffer.clear();
+            Utils.readFully(stream, offsetAndSizeBuffer);
+            if (offsetAndSizeBuffer.hasRemaining())
                 return null;
-            }
+            long offset = offsetAndSizeBuffer.getLong(Records.OFFSET_OFFSET);
+            int size = offsetAndSizeBuffer.getInt(Records.SIZE_OFFSET);
+            if (size < Record.RECORD_OVERHEAD_V0)
+                throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+            if (size > maxMessageSize)
+                throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+            ByteBuffer batchBuffer = ByteBuffer.allocate(size);
+            Utils.readFully(stream, batchBuffer);
+            if (batchBuffer.hasRemaining())
+                return null;
+            batchBuffer.flip();
+            return LogEntry.create(offset, new Record(batchBuffer));
         }
     }
 
@@ -141,13 +148,13 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
         private final long absoluteBaseOffset;
         private final byte wrapperMagic;
 
-        public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+        public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize, BufferSupplier bufferSupplier) {
             Record wrapperRecord = wrapperEntry.record();
             this.wrapperMagic = wrapperRecord.magic();
 
             CompressionType compressionType = wrapperRecord.compressionType();
             ByteBuffer buffer = wrapperRecord.value();
-            DataInputStream stream = MemoryRecordsBuilder.wrapForInput(new ByteBufferInputStream(buffer), compressionType, wrapperRecord.magic());
+            InputStream stream = MemoryRecordsBuilder.wrapForInput(buffer, compressionType, wrapperRecord.magic(), bufferSupplier);
             LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize);
 
             long wrapperRecordOffset = wrapperEntry.offset();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index afa85bd..1751e44 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -855,4 +855,28 @@ public class Utils {
             currentPosition += bytesRead;
         } while (bytesRead != -1 && destinationBuffer.hasRemaining());
     }
+
+    /**
+     * Read data from the input stream to the given byte buffer until there are no bytes remaining in the buffer or the
+     * end of the stream has been reached.
+     *
+     * @param inputStream Input stream to read from
+     * @param destinationBuffer The buffer into which bytes are to be transferred (it must be backed by an array)
+     * @throws IOException If an I/O error occurs
+     */
+    public static final void readFully(InputStream inputStream, ByteBuffer destinationBuffer) throws IOException {
+        if (!destinationBuffer.hasArray())
+            throw new IllegalArgumentException("destinationBuffer must be backed by an array");
+        int initialOffset = destinationBuffer.arrayOffset() + destinationBuffer.position();
+        byte[] array = destinationBuffer.array();
+        int length = destinationBuffer.remaining();
+        int totalBytesRead = 0;
+        do {
+            int bytesRead = inputStream.read(array, initialOffset + totalBytesRead, length - totalBytesRead);
+            if (bytesRead == -1)
+                break;
+            totalBytesRead += bytesRead;
+        } while (length > totalBytesRead);
+        destinationBuffer.position(destinationBuffer.position() + totalBytesRead);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index f8bb1e9..5f26c13 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
@@ -104,7 +105,7 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
+        Iterator<LogEntry> iter = batch.records().deepEntries(BufferSupplier.NO_CACHING).iterator();
         for (int i = 0; i < appends; i++) {
             LogEntry entry = iter.next();
             assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -133,7 +134,7 @@ public class RecordAccumulatorTest {
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
 
-        Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
+        Iterator<LogEntry> iter = batch.records().deepEntries(BufferSupplier.NO_CACHING).iterator();
         LogEntry entry = iter.next();
         assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
         assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
@@ -185,7 +186,7 @@ public class RecordAccumulatorTest {
             List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {
-                    for (LogEntry entry : batch.records().deepEntries())
+                    for (LogEntry entry : batch.records().deepEntries(BufferSupplier.NO_CACHING))
                         read++;
                     accum.deallocate(batch);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 274bf9d..dcbadda 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -385,7 +385,7 @@ public class FileRecordsTest {
     }
 
     private static List<LogEntry> deepEntries(Records buffer) {
-        return TestUtils.toList(buffer.deepEntries());
+        return TestUtils.toList(buffer.deepEntries(BufferSupplier.NO_CACHING));
     }
 
     private FileRecords createFileRecords(Record ... records) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
index 47aebcb..74e9713 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -16,63 +16,166 @@
  */
 package org.apache.kafka.common.record;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 
-import net.jpountz.xxhash.XXHashFactory;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
 public class KafkaLZ4Test {
 
+    private final static Random RANDOM = new Random(0);
+
     private final boolean useBrokenFlagDescriptorChecksum;
     private final boolean ignoreFlagDescriptorChecksum;
     private final byte[] payload;
     private final boolean close;
+    private final boolean blockChecksum;
+
+    static class Payload {
+        String name;
+        byte[] payload;
+
+        Payload(String name, byte[] payload) {
+            this.name = name;
+            this.payload = payload;
+        }
+
+        @Override
+        public String toString() {
+            return "Payload{" +
+                   "size=" + payload.length +
+                   ", name='" + name + '\'' +
+                   '}';
+        }
+    }
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
 
-    public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload, boolean close) {
+    @Parameters(name = "{index} useBrokenFlagDescriptorChecksum={0}, ignoreFlagDescriptorChecksum={1}, blockChecksum={2}, close={3}, payload={4}")
+    public static Collection<Object[]> data() {
+        List<Payload> payloads = new ArrayList<>();
+
+        payloads.add(new Payload("empty", new byte[]{}));
+        payloads.add(new Payload("onebyte", new byte[]{1}));
+
+        for (int size : Arrays.asList(1000, 1 << 16, (1 << 10) * 96)) {
+            byte[] random = new byte[size];
+            RANDOM.nextBytes(random);
+            payloads.add(new Payload("random", random));
+
+            byte[] ones = new byte[size];
+            Arrays.fill(ones, (byte) 1);
+            payloads.add(new Payload("ones", ones));
+        }
+
+        List<Object[]> values = new ArrayList<>();
+        for (Payload payload : payloads)
+            for (boolean broken : Arrays.asList(false, true))
+                for (boolean ignore : Arrays.asList(false, true))
+                    for (boolean blockChecksum : Arrays.asList(false, true))
+                        for (boolean close : Arrays.asList(false, true))
+                            values.add(new Object[]{broken, ignore, blockChecksum, close, payload});
+        return values;
+    }
+
+    public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum,
+                        boolean blockChecksum, boolean close, Payload payload) {
         this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
         this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
-        this.payload = payload;
+        this.payload = payload.payload;
         this.close = close;
+        this.blockChecksum = blockChecksum;
     }
 
-    @Parameters
-    public static Collection<Object[]> data() {
-        byte[] payload = new byte[1000];
-        Arrays.fill(payload, (byte) 1);
-        List<Object[]> values = new ArrayList<Object[]>();
-        for (boolean broken : Arrays.asList(false, true))
-            for (boolean ignore : Arrays.asList(false, true))
-                for (boolean close : Arrays.asList(false, true))
-                    values.add(new Object[] {broken, ignore, payload, close});
-        return values;
+    @Test
+    public void testHeaderPrematureEnd() throws Exception {
+        thrown.expect(IOException.class);
+        thrown.expectMessage(KafkaLZ4BlockInputStream.PREMATURE_EOS);
+
+        final ByteBuffer buffer = ByteBuffer.allocate(2);
+        makeInputStream(buffer);
+    }
+
+    private KafkaLZ4BlockInputStream makeInputStream(
+        ByteBuffer buffer
+    )
+        throws IOException {
+        return new KafkaLZ4BlockInputStream(
+            buffer,
+            BufferSupplier.create(),
+            ignoreFlagDescriptorChecksum
+        );
     }
 
     @Test
-    public void testKafkaLZ4() throws IOException {
-        ByteArrayOutputStream output = new ByteArrayOutputStream();
-        KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(output, this.useBrokenFlagDescriptorChecksum);
-        lz4.write(this.payload, 0, this.payload.length);
-        if (this.close) {
-            lz4.close();
-        } else {
-            lz4.flush();
+    public void testNotSupported() throws Exception {
+        thrown.expect(IOException.class);
+        thrown.expectMessage(KafkaLZ4BlockInputStream.NOT_SUPPORTED);
+
+        byte[] compressed = compressedBytes();
+        compressed[0] = 0x00;
+
+        makeInputStream(ByteBuffer.wrap(compressed));
+    }
+
+    @Test
+    public void testBadFrameChecksum() throws Exception {
+        if (!ignoreFlagDescriptorChecksum) {
+            thrown.expect(IOException.class);
+            thrown.expectMessage(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH);
         }
-        byte[] compressed = output.toByteArray();
+
+        byte[] compressed = compressedBytes();
+        compressed[6] = (byte) 0xFF;
+
+        makeInputStream(ByteBuffer.wrap(compressed));
+    }
+
+    @Test
+    public void testBadBlockSize() throws Exception {
+        if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return;
+
+        thrown.expect(IOException.class);
+        thrown.expectMessage(CoreMatchers.containsString("exceeded max"));
+
+        byte[] compressed = compressedBytes();
+        final ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN);
+
+        int blockSize = buffer.getInt(7);
+        blockSize = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) | (1 << 24 & ~LZ4_FRAME_INCOMPRESSIBLE_MASK);
+        buffer.putInt(7, blockSize);
+
+        testDecompression(buffer);
+    }
+
+
+
+    @Test
+    public void testCompression() throws Exception {
+        byte[] compressed = compressedBytes();
 
         // Check magic bytes stored as little-endian
         int offset = 0;
@@ -138,16 +241,125 @@ public class KafkaLZ4Test {
             assertEquals(0, compressed[offset++]);
             assertEquals(0, compressed[offset++]);
         }
+    }
+
+    @Test
+    public void testArrayBackedBuffer() throws IOException {
+        byte[] compressed = compressedBytes();
+        testDecompression(ByteBuffer.wrap(compressed));
+    }
+
+    @Test
+    public void testArrayBackedBufferSlice() throws IOException {
+        byte[] compressed = compressedBytes();
+
+        int sliceOffset = 12;
+
+        ByteBuffer buffer = ByteBuffer.allocate(compressed.length + sliceOffset + 123);
+        buffer.position(sliceOffset);
+        buffer.put(compressed).flip();
+        buffer.position(sliceOffset);
+
+        ByteBuffer slice = buffer.slice();
+        testDecompression(slice);
+
+        int offset = 42;
+        buffer = ByteBuffer.allocate(compressed.length + sliceOffset + offset);
+        buffer.position(sliceOffset + offset);
+        buffer.put(compressed).flip();
+        buffer.position(sliceOffset);
 
-        ByteArrayInputStream input = new ByteArrayInputStream(compressed);
+        slice = buffer.slice();
+        slice.position(offset);
+        testDecompression(slice);
+    }
+
+    @Test
+    public void testDirectBuffer() throws IOException {
+        byte[] compressed = compressedBytes();
+        ByteBuffer buffer;
+
+        buffer = ByteBuffer.allocateDirect(compressed.length);
+        buffer.put(compressed).flip();
+        testDecompression(buffer);
+
+        int offset = 42;
+        buffer = ByteBuffer.allocateDirect(compressed.length + offset + 123);
+        buffer.position(offset);
+        buffer.put(compressed).flip();
+        buffer.position(offset);
+        testDecompression(buffer);
+    }
+
+    @Test
+    public void testSkip() throws Exception {
+        if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return;
+
+        final KafkaLZ4BlockInputStream in = makeInputStream(ByteBuffer.wrap(compressedBytes()));
+
+        int n = 100;
+        int remaining = payload.length;
+        long skipped = in.skip(n);
+        assertEquals(Math.min(n, remaining), skipped);
+
+        n = 10000;
+        remaining -= skipped;
+        skipped = in.skip(n);
+        assertEquals(Math.min(n, remaining), skipped);
+    }
+
+    private void testDecompression(ByteBuffer buffer) throws IOException {
+        IOException error = null;
         try {
-            KafkaLZ4BlockInputStream decompressed = new KafkaLZ4BlockInputStream(input, this.ignoreFlagDescriptorChecksum);
-            byte[] testPayload = new byte[this.payload.length];
-            int ret = decompressed.read(testPayload, 0, this.payload.length);
-            assertEquals(ret, this.payload.length);
+            KafkaLZ4BlockInputStream decompressed = makeInputStream(buffer);
+
+            byte[] testPayload = new byte[payload.length];
+
+            byte[] tmp = new byte[1024];
+            int n, pos = 0, i = 0;
+            while ((n = decompressed.read(tmp, i, tmp.length - i)) != -1) {
+                i += n;
+                if (i == tmp.length) {
+                    System.arraycopy(tmp, 0, testPayload, pos, i);
+                    pos += i;
+                    i = 0;
+                }
+            }
+            System.arraycopy(tmp, 0, testPayload, pos, i);
+            pos += i;
+
+            assertEquals(-1, decompressed.read(tmp, 0, tmp.length));
+            assertEquals(this.payload.length, pos);
             assertArrayEquals(this.payload, testPayload);
         } catch (IOException e) {
-            assertTrue(this.useBrokenFlagDescriptorChecksum && !this.ignoreFlagDescriptorChecksum);
+            if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) {
+                assertEquals(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH, e.getMessage());
+                error = e;
+            } else if (!close) {
+                assertEquals(KafkaLZ4BlockInputStream.PREMATURE_EOS, e.getMessage());
+                error = e;
+            } else {
+                throw e;
+            }
+        }
+        if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) assertNotNull(error);
+        if (!close) assertNotNull(error);
+    }
+
+    private byte[] compressedBytes() throws IOException {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(
+            output,
+            KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB,
+            blockChecksum,
+            useBrokenFlagDescriptorChecksum
+        );
+        lz4.write(this.payload, 0, this.payload.length);
+        if (this.close) {
+            lz4.close();
+        } else {
+            lz4.flush();
         }
+        return output.toByteArray();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 5f668de..c2ef38b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -71,7 +71,7 @@ public class MemoryRecordsTest {
 
         for (int iteration = 0; iteration < 2; iteration++) {
             for (MemoryRecords recs : asList(recs1, recs2)) {
-                Iterator<LogEntry> iter = recs.deepEntries().iterator();
+                Iterator<LogEntry> iter = recs.deepEntries(BufferSupplier.NO_CACHING).iterator();
                 for (int i = 0; i < list.size(); i++) {
                     assertTrue(iter.hasNext());
                     LogEntry entry = iter.next();
@@ -201,7 +201,7 @@ public class MemoryRecordsTest {
                     shallowEntry.record().timestampType());
         }
 
-        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
+        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries(BufferSupplier.NO_CACHING));
         assertEquals(4, deepEntries.size());
 
         LogEntry first = deepEntries.get(0);
@@ -263,7 +263,7 @@ public class MemoryRecordsTest {
         }
     }
 
-    @Parameterized.Parameters
+    @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}")
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();
         for (long firstOffset : asList(0L, 57L))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a792..47d4bd4 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -164,7 +164,7 @@ private[kafka] object LogValidator {
     val expectedInnerOffset = new LongRef(0)
     val validatedRecords = new mutable.ArrayBuffer[Record]
 
-    records.deepEntries(true).asScala.foreach { logEntry =>
+    records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry =>
       val record = logEntry.record
       validateKey(record, compactedTopic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index e02ed63..592d09e 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -17,12 +17,11 @@
 
 package kafka.message
 
-import java.io.OutputStream
-import java.util.zip.GZIPOutputStream
-import java.util.zip.GZIPInputStream
-import java.io.InputStream
+import java.io.{InputStream, OutputStream}
+import java.nio.ByteBuffer
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
-import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
+import org.apache.kafka.common.record.{BufferSupplier, KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
 
 object CompressionFactory {
   
@@ -40,15 +39,15 @@ object CompressionFactory {
     }
   }
   
-  def apply(compressionCodec: CompressionCodec, messageVersion: Byte, stream: InputStream): InputStream = {
+  def apply(compressionCodec: CompressionCodec, messageVersion: Byte, buffer: ByteBuffer): InputStream = {
     compressionCodec match {
-      case DefaultCompressionCodec => new GZIPInputStream(stream)
-      case GZIPCompressionCodec => new GZIPInputStream(stream)
+      case DefaultCompressionCodec => new GZIPInputStream(new ByteBufferBackedInputStream(buffer))
+      case GZIPCompressionCodec => new GZIPInputStream(new ByteBufferBackedInputStream(buffer))
       case SnappyCompressionCodec => 
         import org.xerial.snappy.SnappyInputStream
-        new SnappyInputStream(stream)
+        new SnappyInputStream(new ByteBufferBackedInputStream(buffer))
       case LZ4CompressionCodec =>
-        new KafkaLZ4BlockInputStream(stream, messageVersion == Message.MagicValue_V0)
+        new KafkaLZ4BlockInputStream(buffer, BufferSupplier.NO_CACHING, messageVersion == Message.MagicValue_V0)
       case _ =>
         throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 5d2c8fb..9f63604 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -18,9 +18,10 @@
 package kafka.message
 
 import org.apache.kafka.common.record._
-
 import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
 import scala.collection._
 import org.scalatest.junit.JUnitSuite
 import org.junit._
@@ -33,7 +34,7 @@ class MessageCompressionTest extends JUnitSuite {
     val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayOutputStream())
     assertTrue(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
 
-    val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A)))
+    val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, ByteBuffer.wrap(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A)))
     assertTrue(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
   }
 
@@ -42,7 +43,7 @@ class MessageCompressionTest extends JUnitSuite {
     val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayOutputStream())
     assertFalse(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
 
-    val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126)))
+    val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, ByteBuffer.wrap(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126)))
     assertFalse(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
index a82a553..467f66c 100644
--- a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
@@ -60,7 +60,7 @@ class MessageWriterTest extends JUnitSuite {
   }
 
   private def decompress(compressed: Array[Byte], codec: CompressionCodec): Array[Byte] = {
-    toArray(CompressionFactory(codec, Message.MagicValue_V1, new ByteArrayInputStream(compressed)))
+    toArray(CompressionFactory(codec, Message.MagicValue_V1, ByteBuffer.wrap(compressed)))
   }
 
   private def toArray(in: InputStream): Array[Byte] = {