You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 01:22:11 UTC

kafka git commit: KAFKA-5150; Reduce LZ4 decompression overhead [Forced Update!]

Repository: kafka
Updated Branches:
  refs/heads/trunk f328f8f7d -> c060c4828 (forced update)


KAFKA-5150; Reduce LZ4 decompression overhead

- reuse decompression buffers in consumer Fetcher
- switch lz4 input stream to operate directly on ByteBuffers
- avoids performance impact of catching exceptions when reaching the end of legacy record batches
- more tests with both compressible / incompressible data, multiple
  blocks, and various other combinations to increase code coverage
- fixes bug that would cause exception instead of invalid block size
  for invalid incompressible blocks
- fixes bug if incompressible flag is set on end frame block size

Overall this improves LZ4 decompression performance by up to 40x for small batches.
Most improvements are seen for batches of size 1 with messages on the order of ~100B.
We see at least 2x improvements for for batch sizes of < 10 messages, containing messages < 10kB

This patch also yields 2-4x improvements on v1 small single message batches for other compression types.

Full benchmark results can be found here
https://gist.github.com/xvrl/05132e0643513df4adf842288be86efd

Author: Xavier Léauté <xa...@confluent.io>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2967 from xvrl/kafka-5150


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c060c482
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c060c482
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c060c482

Branch: refs/heads/trunk
Commit: c060c482855bb9f51e19dde51911645413d85260
Parents: 6021618
Author: Xavier Léauté <xa...@confluent.io>
Authored: Wed May 31 02:20:15 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 02:22:07 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../clients/consumer/internals/Fetcher.java     |   5 +-
 .../record/AbstractLegacyRecordBatch.java       |  58 ++--
 .../kafka/common/record/BufferSupplier.java     |  96 +++++++
 .../kafka/common/record/CompressionType.java    |  68 +++--
 .../kafka/common/record/DefaultRecord.java      |   6 +-
 .../kafka/common/record/DefaultRecordBatch.java |  17 +-
 .../kafka/common/record/FileLogInputStream.java |   4 +-
 .../common/record/KafkaLZ4BlockInputStream.java | 204 ++++++++------
 .../record/KafkaLZ4BlockOutputStream.java       |   4 +-
 .../apache/kafka/common/record/RecordBatch.java |   7 +-
 .../org/apache/kafka/common/utils/Utils.java    |  25 ++
 .../common/record/CompressionTypeTest.java      |   5 +-
 .../common/record/DefaultRecordBatchTest.java   |   2 +-
 .../kafka/common/record/KafkaLZ4Test.java       | 279 ++++++++++++++++---
 .../kafka/common/record/MemoryRecordsTest.java  |   2 +-
 gradle/findbugs-exclude.xml                     |   7 +-
 .../record/RecordBatchIterationBenchmark.java   | 145 ++++++++++
 18 files changed, 731 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7f51979..fdedef8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -202,6 +202,7 @@
     <allow pkg="org.openjdk.jmh.annotations" />
     <allow pkg="org.openjdk.jmh.runner" />
     <allow pkg="org.openjdk.jmh.runner.options" />
+    <allow pkg="org.openjdk.jmh.infra" />
     <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.streams" />
     <allow pkg="org.github.jamm" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 440ca6e..5287b4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.BufferSupplier;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.Record;
@@ -105,6 +106,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+    private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
 
     private final ExtendedDeserializer<K> keyDeserializer;
     private final ExtendedDeserializer<V> valueDeserializer;
@@ -1039,7 +1041,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         }
                     }
 
-                    records = currentBatch.streamingIterator();
+                    records = currentBatch.streamingIterator(decompressionBufferSupplier);
                 }
 
                 Record record = records.next();
@@ -1326,6 +1328,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         if (nextInLineRecords != null)
             nextInLineRecords.drain();
         nextInLineExceptionMetadata = null;
+        decompressionBufferSupplier.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index be69686..e4938be 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -20,16 +20,14 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.NoSuchElementException;
@@ -224,8 +222,12 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
      */
     @Override
     public CloseableIterator<Record> iterator() {
+        return iterator(BufferSupplier.NO_CACHING);
+    }
+
+    private CloseableIterator<Record> iterator(BufferSupplier bufferSupplier) {
         if (isCompressed())
-            return new DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+            return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier);
 
         return new CloseableIterator<Record>() {
             private boolean hasNext = true;
@@ -254,9 +256,9 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     @Override
-    public CloseableIterator<Record> streamingIterator() {
+    public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
         // the older message format versions do not support streaming, so we return the normal iterator
-        return iterator();
+        return iterator(bufferSupplier);
     }
 
     static void writeHeader(ByteBuffer buffer, long offset, int size) {
@@ -270,30 +272,36 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     private static final class DataLogInputStream implements LogInputStream<AbstractLegacyRecordBatch> {
-        private final DataInputStream stream;
+        private final InputStream stream;
         protected final int maxMessageSize;
+        private final ByteBuffer offsetAndSizeBuffer;
 
-        DataLogInputStream(DataInputStream stream, int maxMessageSize) {
+        DataLogInputStream(InputStream stream, int maxMessageSize) {
             this.stream = stream;
             this.maxMessageSize = maxMessageSize;
+            this.offsetAndSizeBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
         }
 
         public AbstractLegacyRecordBatch nextBatch() throws IOException {
-            try {
-                long offset = stream.readLong();
-                int size = stream.readInt();
-                if (size < LegacyRecord.RECORD_OVERHEAD_V0)
-                    throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
-                if (size > maxMessageSize)
-                    throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
-
-                byte[] recordBuffer = new byte[size];
-                stream.readFully(recordBuffer, 0, size);
-                ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
-                return new BasicLegacyRecordBatch(offset, new LegacyRecord(buf));
-            } catch (EOFException e) {
+            offsetAndSizeBuffer.clear();
+            Utils.readFully(stream, offsetAndSizeBuffer);
+            if (offsetAndSizeBuffer.hasRemaining())
                 return null;
-            }
+
+            long offset = offsetAndSizeBuffer.getLong(Records.OFFSET_OFFSET);
+            int size = offsetAndSizeBuffer.getInt(Records.SIZE_OFFSET);
+            if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+                throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
+            if (size > maxMessageSize)
+                throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+            ByteBuffer batchBuffer = ByteBuffer.allocate(size);
+            Utils.readFully(stream, batchBuffer);
+            if (batchBuffer.hasRemaining())
+                return null;
+            batchBuffer.flip();
+
+            return new BasicLegacyRecordBatch(offset, new LegacyRecord(batchBuffer));
         }
     }
 
@@ -302,7 +310,8 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
         private final long absoluteBaseOffset;
         private final byte wrapperMagic;
 
-        private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+        private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic,
+                                    int maxMessageSize, BufferSupplier bufferSupplier) {
             LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
             this.wrapperMagic = wrapperRecord.magic();
 
@@ -312,8 +321,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
                 throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
                         wrapperMagic + ")");
 
-            DataInputStream stream = new DataInputStream(compressionType.wrapForInput(
-                    new ByteBufferInputStream(wrapperValue), wrapperRecord.magic()));
+            InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
             LogInputStream<AbstractLegacyRecordBatch> logStream = new DataLogInputStream(stream, maxMessageSize);
 
             long wrapperRecordOffset = wrapperEntry.lastOffset();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
new file mode 100644
index 0000000..2e09f7d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
+ * a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record
+ * batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and
+ * iterating over the records in the batch.
+ */
+public abstract class BufferSupplier implements AutoCloseable {
+
+    public static final BufferSupplier NO_CACHING = new BufferSupplier() {
+        @Override
+        public ByteBuffer get(int capacity) {
+            return ByteBuffer.allocate(capacity);
+        }
+
+        @Override
+        public void release(ByteBuffer buffer) {}
+
+        @Override
+        public void close() {}
+    };
+
+    public static BufferSupplier create() {
+        return new DefaultSupplier();
+    }
+
+    /**
+     * Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance.
+     */
+    public abstract ByteBuffer get(int capacity);
+
+    /**
+     * Return the provided buffer to be reused by a subsequent call to `get`.
+     */
+    public abstract void release(ByteBuffer buffer);
+
+    /**
+     * Release all resources associated with this supplier.
+     */
+    public abstract void close();
+
+    private static class DefaultSupplier extends BufferSupplier {
+        // We currently use a single block size, so optimise for that case
+        private final Map<Integer, Deque<ByteBuffer>> bufferMap = new HashMap<>(1);
+
+        @Override
+        public ByteBuffer get(int size) {
+            Deque<ByteBuffer> bufferQueue = bufferMap.get(size);
+            if (bufferQueue == null || bufferQueue.isEmpty())
+                return ByteBuffer.allocate(size);
+            else
+                return bufferQueue.pollFirst();
+        }
+
+        @Override
+        public void release(ByteBuffer buffer) {
+            buffer.clear();
+            Deque<ByteBuffer> bufferQueue = bufferMap.get(buffer.capacity());
+            if (bufferQueue == null) {
+                // We currently keep a single buffer in flight, so optimise for that case
+                bufferQueue = new ArrayDeque<>(1);
+                bufferMap.put(buffer.capacity(), bufferQueue);
+            }
+            bufferQueue.addLast(buffer);
+        }
+
+        @Override
+        public void close() {
+            bufferMap.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 15b5958..742493b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import java.lang.invoke.MethodHandle;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
+import java.nio.ByteBuffer;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -39,8 +40,8 @@ public enum CompressionType {
         }
 
         @Override
-        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
-            return buffer;
+        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+            return new ByteBufferInputStream(buffer);
         }
     },
 
@@ -55,9 +56,9 @@ public enum CompressionType {
         }
 
         @Override
-        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
+        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new GZIPInputStream(buffer);
+                return new GZIPInputStream(new ByteBufferInputStream(buffer));
             } catch (Exception e) {
                 throw new KafkaException(e);
             }
@@ -75,9 +76,9 @@ public enum CompressionType {
         }
 
         @Override
-        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
+        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return (InputStream) SnappyConstructors.INPUT.invoke(buffer);
+                return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
@@ -88,18 +89,17 @@ public enum CompressionType {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
             try {
-                return (OutputStream) LZ4Constructors.OUTPUT.invoke(buffer,
-                        messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0);
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
         }
 
         @Override
-        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
+        public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return (InputStream) LZ4Constructors.INPUT.invoke(buffer,
-                        messageVersion == RecordBatch.MAGIC_VALUE_V0);
+                return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
+                                                    messageVersion == RecordBatch.MAGIC_VALUE_V0);
             } catch (Throwable e) {
                 throw new KafkaException(e);
             }
@@ -116,9 +116,26 @@ public enum CompressionType {
         this.rate = rate;
     }
 
-    public abstract OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize);
-
-    public abstract InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion);
+    /**
+     * Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
+     *
+     * Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@#link ByteBuffer}s directly.
+     * Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and {@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()}
+     * write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written.
+     * In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved.
+     */
+    public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion, int bufferSize);
+
+    /**
+     * Wrap buffer with an InputStream that will decompress data with this CompressionType.
+     *
+     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported.
+     *                                    For small record batches, allocating a potentially large buffer (64 KB for LZ4)
+     *                                    will dominate the cost of decompressing and iterating over the records in the
+     *                                    batch. As such, a supplier that reuses buffers will have a significant
+     *                                    performance impact.
+     */
+    public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
 
     public static CompressionType forId(int id) {
         switch (id) {
@@ -148,21 +165,14 @@ public enum CompressionType {
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }
 
-    // Dynamically load the Snappy and LZ4 classes so that we only have a runtime dependency on compression algorithms
-    // that are used. This is important for platforms that are not supported by the underlying libraries.
-    // Note that we are using the initialization-on-demand holder idiom, so it's important that the initialisation
-    // is done in separate classes (one per compression type).
-
-    private static class LZ4Constructors {
-        static final MethodHandle INPUT = findConstructor(
-                "org.apache.kafka.common.record.KafkaLZ4BlockInputStream",
-                MethodType.methodType(void.class, InputStream.class, Boolean.TYPE));
-
-        static final MethodHandle OUTPUT = findConstructor(
-                "org.apache.kafka.common.record.KafkaLZ4BlockOutputStream",
-                MethodType.methodType(void.class, OutputStream.class, Boolean.TYPE));
-
-    }
+    // We should only have a runtime dependency on compression algorithms in case the native libraries don't support
+    // some platforms.
+    //
+    // For Snappy, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
+    // they're only loaded if used.
+    //
+    // For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
+    // an error until KafkaLZ4BlockInputStream is initialized, which only happens if LZ4 is actually used.
 
     private static class SnappyConstructors {
         static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index e61bbc9..5972b42 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.Checksums;
 import org.apache.kafka.common.utils.Crc32C;
 import org.apache.kafka.common.utils.Utils;
 
-import java.io.DataInputStream;
+import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -284,14 +284,14 @@ public class DefaultRecord implements Record {
         return result;
     }
 
-    public static DefaultRecord readFrom(DataInputStream input,
+    public static DefaultRecord readFrom(DataInput input,
                                          long baseOffset,
                                          long baseTimestamp,
                                          int baseSequence,
                                          Long logAppendTime) throws IOException {
         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
         ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
-        input.readFully(recordBuffer.array(), recordBuffer.arrayOffset(), sizeOfBodyInBytes);
+        input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
         return readFrom(recordBuffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index f01116e..2bf889f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;
@@ -226,17 +225,17 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
     }
 
-    private CloseableIterator<Record> compressedIterator() {
+    private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier) {
         ByteBuffer buffer = this.buffer.duplicate();
         buffer.position(RECORDS_OFFSET);
-        final DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
-                new ByteBufferInputStream(buffer), magic()));
+        final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(),
+                bufferSupplier));
 
         return new RecordIterator() {
             @Override
             protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
                 try {
-                    return DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+                    return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
                 } catch (IOException e) {
                     throw new KafkaException("Failed to decompress record stream", e);
                 }
@@ -245,7 +244,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
             @Override
             public void close() {
                 try {
-                    stream.close();
+                    inputStream.close();
                 } catch (IOException e) {
                     throw new KafkaException("Failed to close record stream", e);
                 }
@@ -274,7 +273,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         // for a normal iterator, we cannot ensure that the underlying compression stream is closed,
         // so we decompress the full record set here. Use cases which call for a lower memory footprint
         // can use `streamingIterator` at the cost of additional complexity
-        try (CloseableIterator<Record> iterator = compressedIterator()) {
+        try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING)) {
             List<Record> records = new ArrayList<>(count());
             while (iterator.hasNext())
                 records.add(iterator.next());
@@ -284,9 +283,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
 
 
     @Override
-    public CloseableIterator<Record> streamingIterator() {
+    public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
         if (isCompressed())
-            return compressedIterator();
+            return compressedIterator(bufferSupplier);
         else
             return uncompressedIterator();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 5fe1cef..57fec4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -218,9 +218,9 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         }
 
         @Override
-        public CloseableIterator<Record> streamingIterator() {
+        public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
             loadUnderlyingRecordBatch();
-            return underlying.streamingIterator();
+            return underlying.streamingIterator(bufferSupplier);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index a53690c..56f1058 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -16,80 +16,76 @@
  */
 package org.apache.kafka.common.record;
 
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.ByteUtils;
-
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4SafeDecompressor;
 import net.jpountz.xxhash.XXHash32;
 import net.jpountz.xxhash.XXHashFactory;
 
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
 /**
  * A partial implementation of the v1.5.1 LZ4 Frame format.
  *
- * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
+ * @see <a href="https://github.com/lz4/lz4/wiki/lz4_Frame_format.md">LZ4 Frame Format</a>
+ *
+ * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+public final class KafkaLZ4BlockInputStream extends InputStream {
 
     public static final String PREMATURE_EOS = "Stream ended prematurely";
     public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
     public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
     public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
 
-    private final LZ4SafeDecompressor decompressor;
-    private final XXHash32 checksum;
-    private final byte[] buffer;
-    private final byte[] compressedBuffer;
-    private final int maxBlockSize;
+    private static final LZ4SafeDecompressor DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor();
+    private static final XXHash32 CHECKSUM = XXHashFactory.fastestInstance().hash32();
+
+    private final ByteBuffer in;
     private final boolean ignoreFlagDescriptorChecksum;
+    private final BufferSupplier bufferSupplier;
+    private final ByteBuffer decompressionBuffer;
+    // `flg` and `maxBlockSize` are effectively final, they are initialised in the `readHeader` method that is only
+    // invoked from the constructor
     private FLG flg;
-    private BD bd;
-    private int bufferOffset;
-    private int bufferSize;
+    private int maxBlockSize;
+
+    // If a block is compressed, this is the same as `decompressionBuffer`. If a block is not compressed, this is
+    // a slice of `in` to avoid unnecessary copies.
+    private ByteBuffer decompressedBuffer;
     private boolean finished;
 
     /**
      * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
      *
-     * @param in The stream to decompress
+     * @param in The byte buffer to decompress
      * @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
      * @throws IOException
      */
-    public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
-        super(in);
-        decompressor = LZ4Factory.fastestInstance().safeDecompressor();
-        checksum = XXHashFactory.fastestInstance().hash32();
+    public KafkaLZ4BlockInputStream(ByteBuffer in, BufferSupplier bufferSupplier, boolean ignoreFlagDescriptorChecksum) throws IOException {
         this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+        this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN);
+        this.bufferSupplier = bufferSupplier;
         readHeader();
-        maxBlockSize = bd.getBlockMaximumSize();
-        buffer = new byte[maxBlockSize];
-        compressedBuffer = new byte[maxBlockSize];
-        bufferOffset = 0;
-        bufferSize = 0;
+        decompressionBuffer = bufferSupplier.get(maxBlockSize);
+        if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
+            // require array backed decompression buffer with zero offset
+            // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
+            throw new RuntimeException("decompression buffer must have backing array with zero array offset");
+        }
         finished = false;
     }
 
     /**
-     * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
-     *
-     * @param in The stream to decompress
-     * @throws IOException
-     */
-    public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
-        this(in, false);
-    }
-
-    /**
      * Check whether KafkaLZ4BlockInputStream is configured to ignore the
      * Frame Descriptor checksum, which is useful for compatibility with
      * old client implementations that use incorrect checksum calculations.
@@ -99,43 +95,50 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
     }
 
     /**
-     * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+     * Reads the magic number and frame descriptor from input buffer.
      *
      * @throws IOException
      */
     private void readHeader() throws IOException {
-        byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
-
         // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
-        int headerOffset = 6;
-        if (in.read(header, 0, headerOffset) != headerOffset) {
+        if (in.remaining() < 6) {
             throw new IOException(PREMATURE_EOS);
         }
 
-        if (MAGIC != ByteUtils.readUnsignedIntLE(header, headerOffset - 6)) {
+        if (MAGIC != in.getInt()) {
             throw new IOException(NOT_SUPPORTED);
         }
-        flg = FLG.fromByte(header[headerOffset - 2]);
-        bd = BD.fromByte(header[headerOffset - 1]);
+        // mark start of data to checksum
+        in.mark();
+
+        flg = FLG.fromByte(in.get());
+        maxBlockSize = BD.fromByte(in.get()).getBlockMaximumSize();
 
         if (flg.isContentSizeSet()) {
-            if (in.read(header, headerOffset, 8) != 8)
+            if (in.remaining() < 8) {
                 throw new IOException(PREMATURE_EOS);
-            headerOffset += 8;
+            }
+            in.position(in.position() + 8);
         }
 
         // Final byte of Frame Descriptor is HC checksum
-        header[headerOffset++] = (byte) in.read();
 
         // Old implementations produced incorrect HC checksums
-        if (ignoreFlagDescriptorChecksum)
+        if (ignoreFlagDescriptorChecksum) {
+            in.position(in.position() + 1);
             return;
+        }
+
+        int len = in.position() - in.reset().position();
 
-        int offset = 4;
-        int len = headerOffset - offset - 1; // dont include magic bytes or HC
-        byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
-        if (hash != header[headerOffset - 1])
+        int hash = in.hasArray() ?
+                       // workaround for https://github.com/lz4/lz4-java/pull/65
+                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
+                       CHECKSUM.hash(in, in.position(), len, 0);
+        in.position(in.position() + len);
+        if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
             throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+        }
     }
 
     /**
@@ -145,46 +148,70 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
      * @throws IOException
      */
     private void readBlock() throws IOException {
-        int blockSize = ByteUtils.readUnsignedIntLE(in);
+        if (in.remaining() < 4) {
+            throw new IOException(PREMATURE_EOS);
+        }
+
+        int blockSize = in.getInt();
+        boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+        blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
 
         // Check for EndMark
         if (blockSize == 0) {
             finished = true;
             if (flg.isContentChecksumSet())
-                ByteUtils.readUnsignedIntLE(in); // TODO: verify this content checksum
+                in.getInt(); // TODO: verify this content checksum
             return;
         } else if (blockSize > maxBlockSize) {
             throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
         }
 
-        boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
-        byte[] bufferToRead;
-        if (compressed) {
-            bufferToRead = compressedBuffer;
-        } else {
-            blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
-            bufferToRead = buffer;
-            bufferSize = blockSize;
-        }
-
-        if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+        if (in.remaining() < blockSize) {
             throw new IOException(PREMATURE_EOS);
         }
 
-        // verify checksum
-        if (flg.isBlockChecksumSet() && ByteUtils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
-            throw new IOException(BLOCK_HASH_MISMATCH);
-        }
-
         if (compressed) {
             try {
-                bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+                // workaround for https://github.com/lz4/lz4-java/pull/65
+                final int bufferSize;
+                if (in.hasArray()) {
+                    bufferSize = DECOMPRESSOR.decompress(
+                        in.array(),
+                        in.position() + in.arrayOffset(),
+                        blockSize,
+                        decompressionBuffer.array(),
+                        0,
+                        maxBlockSize
+                    );
+                } else {
+                    // decompressionBuffer has zero arrayOffset, so we don't need to worry about
+                    // https://github.com/lz4/lz4-java/pull/65
+                    bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
+                }
+                decompressionBuffer.position(0);
+                decompressionBuffer.limit(bufferSize);
+                decompressedBuffer = decompressionBuffer;
             } catch (LZ4Exception e) {
                 throw new IOException(e);
             }
+        } else {
+            decompressedBuffer = in.slice();
+            decompressedBuffer.limit(blockSize);
         }
 
-        bufferOffset = 0;
+        // verify checksum
+        if (flg.isBlockChecksumSet()) {
+            // workaround for https://github.com/lz4/lz4-java/pull/65
+            int hash = in.hasArray() ?
+                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
+                       CHECKSUM.hash(in, in.position(), blockSize, 0);
+            in.position(in.position() + blockSize);
+            if (hash != in.getInt()) {
+                throw new IOException(BLOCK_HASH_MISMATCH);
+            }
+        } else {
+            in.position(in.position() + blockSize);
+        }
     }
 
     @Override
@@ -199,7 +226,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
             return -1;
         }
 
-        return buffer[bufferOffset++] & 0xFF;
+        return decompressedBuffer.get() & 0xFF;
     }
 
     @Override
@@ -215,8 +242,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
             return -1;
         }
         len = Math.min(len, available());
-        System.arraycopy(buffer, bufferOffset, b, off, len);
-        bufferOffset += len;
+
+        decompressedBuffer.get(b, off, len);
         return len;
     }
 
@@ -231,28 +258,28 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
         if (finished) {
             return 0;
         }
-        n = Math.min(n, available());
-        bufferOffset += n;
-        return n;
+        int skipped = (int) Math.min(n, available());
+        decompressedBuffer.position(decompressedBuffer.position() + skipped);
+        return skipped;
     }
 
     @Override
     public int available() throws IOException {
-        return bufferSize - bufferOffset;
+        return decompressedBuffer == null ? 0 : decompressedBuffer.remaining();
     }
 
     @Override
     public void close() throws IOException {
-        in.close();
+        bufferSupplier.release(decompressionBuffer);
     }
 
     @Override
-    public synchronized void mark(int readlimit) {
+    public void mark(int readlimit) {
         throw new RuntimeException("mark not supported");
     }
 
     @Override
-    public synchronized void reset() throws IOException {
+    public void reset() throws IOException {
         throw new RuntimeException("reset not supported");
     }
 
@@ -260,5 +287,4 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
     public boolean markSupported() {
         return false;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index d029489..8cfc37b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -30,7 +30,9 @@ import net.jpountz.xxhash.XXHashFactory;
 /**
  * A partial implementation of the v1.5.1 LZ4 Frame format.
  *
- * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
+ * @see <a href="https://github.com/lz4/lz4/wiki/lz4_Frame_format.md">LZ4 Frame Format</a>
+ *
+ * This class is not thread-safe.
  */
 public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index db75105..ef773da 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -219,9 +219,14 @@ public interface RecordBatch extends Iterable<Record> {
      * are actually asked for using {@link Iterator#next()}. If the message format does not support streaming
      * iteration, then the normal iterator is returned. Either way, callers should ensure that the iterator is closed.
      *
+     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported.
+     *                                    For small record batches, allocating a potentially large buffer (64 KB for LZ4)
+     *                                    will dominate the cost of decompressing and iterating over the records in the
+     *                                    batch. As such, a supplier that reuses buffers will have a significant
+     *                                    performance impact.
      * @return The closeable iterator
      */
-    CloseableIterator<Record> streamingIterator();
+    CloseableIterator<Record> streamingIterator(BufferSupplier decompressionBufferSupplier);
 
     /**
      * Check whether this is a control batch (i.e. whether the control bit is set in the batch attributes).

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 24ee788..9fbc387 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -762,6 +762,31 @@ public class Utils {
         } while (bytesRead != -1 && destinationBuffer.hasRemaining());
     }
 
+    /**
+     * Read data from the input stream to the given byte buffer until there are no bytes remaining in the buffer or the
+     * end of the stream has been reached.
+     *
+     * @param inputStream Input stream to read from
+     * @param destinationBuffer The buffer into which bytes are to be transferred (it must be backed by an array)
+     *
+     * @throws IOException If an I/O error occurs
+     */
+    public static final void readFully(InputStream inputStream, ByteBuffer destinationBuffer) throws IOException {
+        if (!destinationBuffer.hasArray())
+            throw new IllegalArgumentException("destinationBuffer must be backed by an array");
+        int initialOffset = destinationBuffer.arrayOffset() + destinationBuffer.position();
+        byte[] array = destinationBuffer.array();
+        int length = destinationBuffer.remaining();
+        int totalBytesRead = 0;
+        do {
+            int bytesRead = inputStream.read(array, initialOffset + totalBytesRead, length - totalBytesRead);
+            if (bytesRead == -1)
+                break;
+            totalBytesRead += bytesRead;
+        } while (length > totalBytesRead);
+        destinationBuffer.position(destinationBuffer.position() + totalBytesRead);
+    }
+
     public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException {
         while (sourceBuffer.hasRemaining())
             channel.write(sourceBuffer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index 98dc591..fe196c8 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.junit.Test;
 
@@ -37,7 +36,7 @@ public class CompressionTypeTest {
         buffer.rewind();
 
         KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
-                new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V0);
+                buffer, RecordBatch.MAGIC_VALUE_V0, BufferSupplier.NO_CACHING);
         assertTrue(in.ignoreFlagDescriptorChecksum());
     }
 
@@ -51,7 +50,7 @@ public class CompressionTypeTest {
         buffer.rewind();
 
         KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
-                new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V1);
+                buffer, RecordBatch.MAGIC_VALUE_V1, BufferSupplier.create());
         assertFalse(in.ignoreFlagDescriptorChecksum());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index ec858aa..3db1159 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -260,7 +260,7 @@ public class DefaultRecordBatchTest {
                 new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
                 new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
         DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
-        try (CloseableIterator<Record> streamingIterator = batch.streamingIterator()) {
+        try (CloseableIterator<Record> streamingIterator = batch.streamingIterator(BufferSupplier.create())) {
             TestUtils.checkEquals(streamingIterator, batch.iterator());
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
index 2c3b2fc..222599b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -16,63 +16,159 @@
  */
 package org.apache.kafka.common.record;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 
-import net.jpountz.xxhash.XXHashFactory;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
 public class KafkaLZ4Test {
 
+    private final static Random RANDOM = new Random(0);
+
     private final boolean useBrokenFlagDescriptorChecksum;
     private final boolean ignoreFlagDescriptorChecksum;
     private final byte[] payload;
     private final boolean close;
+    private final boolean blockChecksum;
 
-    public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload, boolean close) {
-        this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
-        this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
-        this.payload = payload;
-        this.close = close;
+    static class Payload {
+        String name;
+        byte[] payload;
+
+        Payload(String name, byte[] payload) {
+            this.name = name;
+            this.payload = payload;
+        }
+
+        @Override
+        public String toString() {
+            return "Payload{" +
+                   "size=" + payload.length +
+                   ", name='" + name + '\'' +
+                   '}';
+        }
     }
 
-    @Parameters
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Parameters(name = "{index} useBrokenFlagDescriptorChecksum={0}, ignoreFlagDescriptorChecksum={1}, blockChecksum={2}, close={3}, payload={4}")
     public static Collection<Object[]> data() {
-        byte[] payload = new byte[1000];
-        Arrays.fill(payload, (byte) 1);
+        List<Payload> payloads = new ArrayList<>();
+
+        payloads.add(new Payload("empty", new byte[]{}));
+        payloads.add(new Payload("onebyte", new byte[]{1}));
+
+        for (int size : Arrays.asList(1000, 1 << 16, (1 << 10) * 96)) {
+            byte[] random = new byte[size];
+            RANDOM.nextBytes(random);
+            payloads.add(new Payload("random", random));
+
+            byte[] ones = new byte[size];
+            Arrays.fill(ones, (byte) 1);
+            payloads.add(new Payload("ones", ones));
+        }
+
         List<Object[]> values = new ArrayList<>();
-        for (boolean broken : Arrays.asList(false, true))
-            for (boolean ignore : Arrays.asList(false, true))
-                for (boolean close : Arrays.asList(false, true))
-                    values.add(new Object[] {broken, ignore, payload, close});
+        for (Payload payload : payloads)
+            for (boolean broken : Arrays.asList(false, true))
+                for (boolean ignore : Arrays.asList(false, true))
+                    for (boolean blockChecksum : Arrays.asList(false, true))
+                        for (boolean close : Arrays.asList(false, true))
+                            values.add(new Object[]{broken, ignore, blockChecksum, close, payload});
         return values;
     }
 
+    public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum,
+                        boolean blockChecksum, boolean close, Payload payload) {
+        this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
+        this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+        this.payload = payload.payload;
+        this.close = close;
+        this.blockChecksum = blockChecksum;
+    }
+
     @Test
-    public void testKafkaLZ4() throws IOException {
-        ByteArrayOutputStream output = new ByteArrayOutputStream();
-        KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(output, this.useBrokenFlagDescriptorChecksum);
-        lz4.write(this.payload, 0, this.payload.length);
-        if (this.close) {
-            lz4.close();
-        } else {
-            lz4.flush();
+    public void testHeaderPrematureEnd() throws Exception {
+        thrown.expect(IOException.class);
+        thrown.expectMessage(KafkaLZ4BlockInputStream.PREMATURE_EOS);
+
+        final ByteBuffer buffer = ByteBuffer.allocate(2);
+        makeInputStream(buffer);
+    }
+
+    private KafkaLZ4BlockInputStream makeInputStream(ByteBuffer buffer) throws IOException {
+        return new KafkaLZ4BlockInputStream(buffer, BufferSupplier.create(), ignoreFlagDescriptorChecksum);
+    }
+
+    @Test
+    public void testNotSupported() throws Exception {
+        thrown.expect(IOException.class);
+        thrown.expectMessage(KafkaLZ4BlockInputStream.NOT_SUPPORTED);
+
+        byte[] compressed = compressedBytes();
+        compressed[0] = 0x00;
+
+        makeInputStream(ByteBuffer.wrap(compressed));
+    }
+
+    @Test
+    public void testBadFrameChecksum() throws Exception {
+        if (!ignoreFlagDescriptorChecksum) {
+            thrown.expect(IOException.class);
+            thrown.expectMessage(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH);
         }
-        byte[] compressed = output.toByteArray();
+
+        byte[] compressed = compressedBytes();
+        compressed[6] = (byte) 0xFF;
+
+        makeInputStream(ByteBuffer.wrap(compressed));
+    }
+
+    @Test
+    public void testBadBlockSize() throws Exception {
+        if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return;
+
+        thrown.expect(IOException.class);
+        thrown.expectMessage(CoreMatchers.containsString("exceeded max"));
+
+        byte[] compressed = compressedBytes();
+        final ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN);
+
+        int blockSize = buffer.getInt(7);
+        blockSize = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) | (1 << 24 & ~LZ4_FRAME_INCOMPRESSIBLE_MASK);
+        buffer.putInt(7, blockSize);
+
+        testDecompression(buffer);
+    }
+
+
+
+    @Test
+    public void testCompression() throws Exception {
+        byte[] compressed = compressedBytes();
 
         // Check magic bytes stored as little-endian
         int offset = 0;
@@ -138,16 +234,125 @@ public class KafkaLZ4Test {
             assertEquals(0, compressed[offset++]);
             assertEquals(0, compressed[offset++]);
         }
+    }
+
+    @Test
+    public void testArrayBackedBuffer() throws IOException {
+        byte[] compressed = compressedBytes();
+        testDecompression(ByteBuffer.wrap(compressed));
+    }
+
+    @Test
+    public void testArrayBackedBufferSlice() throws IOException {
+        byte[] compressed = compressedBytes();
+
+        int sliceOffset = 12;
+
+        ByteBuffer buffer = ByteBuffer.allocate(compressed.length + sliceOffset + 123);
+        buffer.position(sliceOffset);
+        buffer.put(compressed).flip();
+        buffer.position(sliceOffset);
+
+        ByteBuffer slice = buffer.slice();
+        testDecompression(slice);
+
+        int offset = 42;
+        buffer = ByteBuffer.allocate(compressed.length + sliceOffset + offset);
+        buffer.position(sliceOffset + offset);
+        buffer.put(compressed).flip();
+        buffer.position(sliceOffset);
 
-        ByteArrayInputStream input = new ByteArrayInputStream(compressed);
+        slice = buffer.slice();
+        slice.position(offset);
+        testDecompression(slice);
+    }
+
+    @Test
+    public void testDirectBuffer() throws IOException {
+        byte[] compressed = compressedBytes();
+        ByteBuffer buffer;
+
+        buffer = ByteBuffer.allocateDirect(compressed.length);
+        buffer.put(compressed).flip();
+        testDecompression(buffer);
+
+        int offset = 42;
+        buffer = ByteBuffer.allocateDirect(compressed.length + offset + 123);
+        buffer.position(offset);
+        buffer.put(compressed).flip();
+        buffer.position(offset);
+        testDecompression(buffer);
+    }
+
+    @Test
+    public void testSkip() throws Exception {
+        if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return;
+
+        final KafkaLZ4BlockInputStream in = makeInputStream(ByteBuffer.wrap(compressedBytes()));
+
+        int n = 100;
+        int remaining = payload.length;
+        long skipped = in.skip(n);
+        assertEquals(Math.min(n, remaining), skipped);
+
+        n = 10000;
+        remaining -= skipped;
+        skipped = in.skip(n);
+        assertEquals(Math.min(n, remaining), skipped);
+    }
+
+    private void testDecompression(ByteBuffer buffer) throws IOException {
+        IOException error = null;
         try {
-            KafkaLZ4BlockInputStream decompressed = new KafkaLZ4BlockInputStream(input, this.ignoreFlagDescriptorChecksum);
-            byte[] testPayload = new byte[this.payload.length];
-            int ret = decompressed.read(testPayload, 0, this.payload.length);
-            assertEquals(ret, this.payload.length);
+            KafkaLZ4BlockInputStream decompressed = makeInputStream(buffer);
+
+            byte[] testPayload = new byte[payload.length];
+
+            byte[] tmp = new byte[1024];
+            int n, pos = 0, i = 0;
+            while ((n = decompressed.read(tmp, i, tmp.length - i)) != -1) {
+                i += n;
+                if (i == tmp.length) {
+                    System.arraycopy(tmp, 0, testPayload, pos, i);
+                    pos += i;
+                    i = 0;
+                }
+            }
+            System.arraycopy(tmp, 0, testPayload, pos, i);
+            pos += i;
+
+            assertEquals(-1, decompressed.read(tmp, 0, tmp.length));
+            assertEquals(this.payload.length, pos);
             assertArrayEquals(this.payload, testPayload);
         } catch (IOException e) {
-            assertTrue(this.useBrokenFlagDescriptorChecksum && !this.ignoreFlagDescriptorChecksum);
+            if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) {
+                assertEquals(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH, e.getMessage());
+                error = e;
+            } else if (!close) {
+                assertEquals(KafkaLZ4BlockInputStream.PREMATURE_EOS, e.getMessage());
+                error = e;
+            } else {
+                throw e;
+            }
+        }
+        if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) assertNotNull(error);
+        if (!close) assertNotNull(error);
+    }
+
+    private byte[] compressedBytes() throws IOException {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(
+            output,
+            KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB,
+            blockChecksum,
+            useBrokenFlagDescriptorChecksum
+        );
+        lz4.write(this.payload, 0, this.payload.length);
+        if (this.close) {
+            lz4.close();
+        } else {
+            lz4.flush();
         }
+        return output.toByteArray();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index a7058a3..619fbbd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -649,7 +649,7 @@ public class MemoryRecordsTest {
         }
     }
 
-    @Parameterized.Parameters
+    @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}")
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();
         for (long firstOffset : asList(0L, 57L))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index b0616a8..eec9268 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -160,9 +160,12 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
     </Match>
 
     <Match>
-        <!-- Suppress some minor warnings about machine-generated code for LRU cache
+        <!-- Suppress some minor warnings about machine-generated code for
              benchmarking. -->
-        <Package name="org.apache.kafka.jmh.cache.generated"/>
+        <Or>
+            <Package name="org.apache.kafka.jmh.cache.generated"/>
+            <Package name="org.apache.kafka.jmh.record.generated"/>
+        </Or>
     </Match>
 
     <Match>

http://git-wip-us.apache.org/repos/asf/kafka/blob/c060c482/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
----------------------------------------------------------------------
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
new file mode 100644
index 0000000..cfb72e6
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+public class RecordBatchIterationBenchmark {
+
+    private final Random random = new Random(0);
+    private final int batchCount = 5000;
+    private final int maxBatchSize = 10;
+
+    public enum Bytes {
+        RANDOM, ONES
+    }
+
+    @Param(value = {"LZ4", "SNAPPY", "NONE"})
+    private CompressionType type = CompressionType.NONE;
+
+    @Param(value = {"1", "2"})
+    private byte messageVersion = CURRENT_MAGIC_VALUE;
+
+    @Param(value = {"100", "1000", "10000", "100000"})
+    private int messageSize = 100;
+
+    @Param(value = {"RANDOM", "ONES"})
+    private Bytes bytes = Bytes.RANDOM;
+
+    // zero starting offset is much faster for v1 batches, but that will almost never happen
+    private final int startingOffset = 42;
+
+    // Used by measureSingleMessage
+    private ByteBuffer singleBatchBuffer;
+
+    // Used by measureVariableBatchSize
+    private ByteBuffer[] batchBuffers;
+    private int[] batchSizes;
+    private BufferSupplier bufferSupplier;
+
+    @Setup
+    public void init() {
+        bufferSupplier = BufferSupplier.create();
+        singleBatchBuffer = createBatch(1);
+
+        batchBuffers = new ByteBuffer[batchCount];
+        batchSizes = new int[batchCount];
+        for (int i = 0; i < batchCount; ++i) {
+            int size = random.nextInt(maxBatchSize) + 1;
+            batchBuffers[i] = createBatch(size);
+            batchSizes[i] = size;
+        }
+    }
+
+    private ByteBuffer createBatch(int batchSize) {
+        byte[] value = new byte[messageSize];
+        final ByteBuffer buf = ByteBuffer.allocate(
+            AbstractRecords.sizeInBytesUpperBound(messageVersion, new byte[0], value, new Header[0]) * batchSize
+        );
+
+        final MemoryRecordsBuilder builder =
+            MemoryRecords.builder(buf, messageVersion, type, TimestampType.CREATE_TIME, startingOffset);
+
+        for (int i = 0; i < batchSize; ++i) {
+            switch (bytes) {
+                case ONES:
+                    Arrays.fill(value, (byte) 1);
+                    break;
+                case RANDOM:
+                    random.nextBytes(value);
+                    break;
+            }
+
+            builder.append(0, null, value);
+        }
+        return builder.build().buffer();
+    }
+
+    @Benchmark
+    public void measureIteratorForBatchWithSingleMessage(Blackhole bh) throws IOException {
+        for (RecordBatch batch : MemoryRecords.readableRecords(singleBatchBuffer.duplicate()).batches()) {
+            try (CloseableIterator<Record> iterator = batch.streamingIterator(bufferSupplier)) {
+                while (iterator.hasNext())
+                    bh.consume(iterator.next());
+            }
+        }
+    }
+
+    @OperationsPerInvocation(value = batchCount)
+    @Benchmark
+    public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) throws IOException {
+        for (int i = 0; i < batchCount; ++i) {
+            for (RecordBatch batch : MemoryRecords.readableRecords(batchBuffers[i].duplicate()).batches()) {
+                try (CloseableIterator<Record> iterator = batch.streamingIterator(bufferSupplier)) {
+                    while (iterator.hasNext())
+                        bh.consume(iterator.next());
+                }
+            }
+        }
+    }
+
+}