You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/07 18:36:21 UTC

kafka git commit: KAFKA-3160; Fix LZ4 Framing

Repository: kafka
Updated Branches:
  refs/heads/trunk c4bbf3424 -> 8fe255223


KAFKA-3160; Fix LZ4 Framing

This contribution is my original work and I license the work under Apache 2.0.

Author: Dana Powers <da...@gmail.com>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1212 from dpkp/KAFKA-3160


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8fe25522
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8fe25522
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8fe25522

Branch: refs/heads/trunk
Commit: 8fe2552239863f3a01d01708d55edf3c7082ff92
Parents: c4bbf34
Author: Dana Powers <da...@gmail.com>
Authored: Sat May 7 19:35:55 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat May 7 19:35:55 2016 +0100

----------------------------------------------------------------------
 .../apache/kafka/common/record/Compressor.java  |   7 +-
 .../common/record/KafkaLZ4BlockInputStream.java |  68 ++++++---
 .../record/KafkaLZ4BlockOutputStream.java       |  89 +++++++-----
 .../kafka/common/record/MemoryRecords.java      |  12 +-
 .../kafka/common/record/KafkaLZ4Test.java       | 137 +++++++++++++++++++
 core/src/main/scala/kafka/log/LogCleaner.scala  |   2 +-
 .../kafka/message/ByteBufferMessageSet.scala    |   7 +-
 .../kafka/message/CompressionFactory.scala      |   8 +-
 .../kafka/message/MessageCompressionTest.scala  |  21 +++
 .../unit/kafka/message/MessageWriterTest.scala  |   6 +-
 docs/upgrade.html                               |   5 +
 11 files changed, 289 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index afa85a4..37d53b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -77,7 +77,7 @@ public class Compressor {
         @Override
         public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
             return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
-                .getConstructor(InputStream.class);
+                .getConstructor(InputStream.class, Boolean.TYPE);
         }
     });
 
@@ -275,7 +275,7 @@ public class Compressor {
         }
     }
 
-    static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) {
+    static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
         try {
             switch (type) {
                 case NONE:
@@ -291,7 +291,8 @@ public class Compressor {
                     }
                 case LZ4:
                     try {
-                        InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer);
+                        InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
+                                messageVersion == Record.MAGIC_VALUE_V0);
                         return new DataInputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 372d4f4..92718d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -36,15 +36,14 @@ import net.jpountz.xxhash.XXHash32;
 import net.jpountz.xxhash.XXHashFactory;
 
 /**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * A partial implementation of the v1.5.1 LZ4 Frame format.
  *
- * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
- *      Format Spec</a>
+ * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
  */
 public final class KafkaLZ4BlockInputStream extends FilterInputStream {
 
     public static final String PREMATURE_EOS = "Stream ended prematurely";
-    public static final String NOT_SUPPORTED = "Stream unsupported";
+    public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
     public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
     public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
 
@@ -53,6 +52,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
     private final byte[] buffer;
     private final byte[] compressedBuffer;
     private final int maxBlockSize;
+    private final boolean ignoreFlagDescriptorChecksum;
     private FLG flg;
     private BD bd;
     private int bufferOffset;
@@ -63,12 +63,14 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
      * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
      *
      * @param in The stream to decompress
+     * @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
      * @throws IOException
      */
-    public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+    public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
         super(in);
         decompressor = LZ4Factory.fastestInstance().safeDecompressor();
         checksum = XXHashFactory.fastestInstance().hash32();
+        this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
         readHeader();
         maxBlockSize = bd.getBlockMaximumSize();
         buffer = new byte[maxBlockSize];
@@ -79,6 +81,25 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
     }
 
     /**
+     * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+     *
+     * @param in The stream to decompress
+     * @throws IOException
+     */
+    public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+        this(in, false);
+    }
+
+    /**
+     * Check whether KafkaLZ4BlockInputStream is configured to ignore the
+     * Frame Descriptor checksum, which is useful for compatibility with
+     * old client implementations that use incorrect checksum calculations.
+     */
+    public boolean ignoreFlagDescriptorChecksum() {
+        return this.ignoreFlagDescriptorChecksum;
+    }
+
+    /**
      * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
      *
      * @throws IOException
@@ -87,25 +108,35 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
         byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
 
         // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
-        bufferOffset = 6;
-        if (in.read(header, 0, bufferOffset) != bufferOffset) {
+        int headerOffset = 6;
+        if (in.read(header, 0, headerOffset) != headerOffset) {
             throw new IOException(PREMATURE_EOS);
         }
 
-        if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset - 6)) {
+        if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) {
             throw new IOException(NOT_SUPPORTED);
         }
-        flg = FLG.fromByte(header[bufferOffset - 2]);
-        bd = BD.fromByte(header[bufferOffset - 1]);
-        // TODO read uncompressed content size, update flg.validate()
-        // TODO read dictionary id, update flg.validate()
+        flg = FLG.fromByte(header[headerOffset - 2]);
+        bd = BD.fromByte(header[headerOffset - 1]);
 
-        // check stream descriptor hash
-        byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
-        header[bufferOffset++] = (byte) in.read();
-        if (hash != header[bufferOffset - 1]) {
-            throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+        if (flg.isContentSizeSet()) {
+            if (in.read(header, headerOffset, 8) != 8)
+                throw new IOException(PREMATURE_EOS);
+            headerOffset += 8;
         }
+
+        // Final byte of Frame Descriptor is HC checksum
+        header[headerOffset++] = (byte) in.read();
+
+        // Old implementations produced incorrect HC checksums
+        if (ignoreFlagDescriptorChecksum)
+            return;
+
+        int offset = 4;
+        int len = headerOffset - offset - 1; // dont include magic bytes or HC
+        byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
+        if (hash != header[headerOffset - 1])
+            throw new IOException(DESCRIPTOR_HASH_MISMATCH);
     }
 
     /**
@@ -120,7 +151,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
         // Check for EndMark
         if (blockSize == 0) {
             finished = true;
-            // TODO implement content checksum, update flg.validate()
+            if (flg.isContentChecksumSet())
+                Utils.readUnsignedIntLE(in); // TODO: verify this content checksum
             return;
         } else if (blockSize > maxBlockSize) {
             throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 7d23f4a..933b2cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -29,10 +29,9 @@ import net.jpountz.xxhash.XXHash32;
 import net.jpountz.xxhash.XXHashFactory;
 
 /**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * A partial implementation of the v1.5.1 LZ4 Frame format.
  *
- * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
- *      Format Spec</a>
+ * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
  */
 public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
 
@@ -49,6 +48,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
 
     private final LZ4Compressor compressor;
     private final XXHash32 checksum;
+    private final boolean useBrokenFlagDescriptorChecksum;
     private final FLG flg;
     private final BD bd;
     private final byte[] buffer;
@@ -65,12 +65,15 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
      *            values will generate an exception
      * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
      *            every block of data
+     * @param useBrokenFlagDescriptorChecksum Default: false. When true, writes an incorrect FrameDescriptor checksum
+     *            compatible with older kafka clients.
      * @throws IOException
      */
-    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
         super(out);
         compressor = LZ4Factory.fastestInstance().fastCompressor();
         checksum = XXHashFactory.fastestInstance().hash32();
+        this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
         bd = new BD(blockSize);
         flg = new FLG(blockChecksum);
         bufferOffset = 0;
@@ -84,13 +87,27 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
     /**
      * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
      *
+     * @param out The output stream to compress
+     * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+     *            values will generate an exception
+     * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+     *            every block of data
+     * @throws IOException
+     */
+    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+        this(out, blockSize, blockChecksum, false);
+    }
+
+    /**
+     * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+     *
      * @param out The stream to compress
      * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
      *            values will generate an exception
      * @throws IOException
      */
     public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
-        this(out, blockSize, false);
+        this(out, blockSize, false, false);
     }
 
     /**
@@ -103,6 +120,19 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
         this(out, BLOCKSIZE_64KB);
     }
 
+    public KafkaLZ4BlockOutputStream(OutputStream out, boolean useBrokenHC) throws IOException {
+        this(out, BLOCKSIZE_64KB, false, useBrokenHC);
+    }
+
+    /**
+     * Check whether KafkaLZ4BlockInputStream is configured to write an
+     * incorrect Frame Descriptor checksum, which is useful for
+     * compatibility with old client implementations.
+     */
+    public boolean useBrokenFlagDescriptorChecksum() {
+        return this.useBrokenFlagDescriptorChecksum;
+    }
+
     /**
      * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
      *
@@ -114,10 +144,17 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
         buffer[bufferOffset++] = flg.toByte();
         buffer[bufferOffset++] = bd.toByte();
         // TODO write uncompressed content size, update flg.validate()
-        // TODO write dictionary id, update flg.validate()
+
         // compute checksum on all descriptor fields
-        int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
-        buffer[bufferOffset++] = (byte) hash;
+        int offset = 4;
+        int len = bufferOffset - offset;
+        if (this.useBrokenFlagDescriptorChecksum) {
+            len += offset;
+            offset = 0;
+        }
+        byte hash = (byte) ((checksum.hash(buffer, offset, len, 0) >> 8) & 0xFF);
+        buffer[bufferOffset++] = hash;
+
         // write out frame descriptor
         out.write(buffer, 0, bufferOffset);
         bufferOffset = 0;
@@ -236,8 +273,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
 
         private static final int VERSION = 1;
 
-        private final int presetDictionary;
-        private final int reserved1;
+        private final int reserved;
         private final int contentChecksum;
         private final int contentSize;
         private final int blockChecksum;
@@ -249,18 +285,16 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
         }
 
         public FLG(boolean blockChecksum) {
-            this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+            this(0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
         }
 
-        private FLG(int presetDictionary,
-                    int reserved1,
+        private FLG(int reserved,
                     int contentChecksum,
                     int contentSize,
                     int blockChecksum,
                     int blockIndependence,
                     int version) {
-            this.presetDictionary = presetDictionary;
-            this.reserved1 = reserved1;
+            this.reserved = reserved;
             this.contentChecksum = contentChecksum;
             this.contentSize = contentSize;
             this.blockChecksum = blockChecksum;
@@ -270,16 +304,14 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
         }
 
         public static FLG fromByte(byte flg) {
-            int presetDictionary = (flg >>> 0) & 1;
-            int reserved1 = (flg >>> 1) & 1;
+            int reserved = (flg >>> 0) & 3;
             int contentChecksum = (flg >>> 2) & 1;
             int contentSize = (flg >>> 3) & 1;
             int blockChecksum = (flg >>> 4) & 1;
             int blockIndependence = (flg >>> 5) & 1;
             int version = (flg >>> 6) & 3;
 
-            return new FLG(presetDictionary,
-                           reserved1,
+            return new FLG(reserved,
                            contentChecksum,
                            contentSize,
                            blockChecksum,
@@ -288,22 +320,13 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
         }
 
         public byte toByte() {
-            return (byte) (((presetDictionary & 1) << 0) | ((reserved1 & 1) << 1) | ((contentChecksum & 1) << 2)
+            return (byte) (((reserved & 3) << 0) | ((contentChecksum & 1) << 2)
                     | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
         }
 
         private void validate() {
-            if (presetDictionary != 0) {
-                throw new RuntimeException("Preset dictionary is unsupported");
-            }
-            if (reserved1 != 0) {
-                throw new RuntimeException("Reserved1 field must be 0");
-            }
-            if (contentChecksum != 0) {
-                throw new RuntimeException("Content checksum is unsupported");
-            }
-            if (contentSize != 0) {
-                throw new RuntimeException("Content size is unsupported");
+            if (reserved != 0) {
+                throw new RuntimeException("Reserved bits must be 0");
             }
             if (blockIndependence != 1) {
                 throw new RuntimeException("Dependent block stream is unsupported");
@@ -313,10 +336,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
             }
         }
 
-        public boolean isPresetDictionarySet() {
-            return presetDictionary == 1;
-        }
-
         public boolean isContentChecksumSet() {
             return contentChecksum == 1;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 7175953..fcf7f44 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -187,10 +187,10 @@ public class MemoryRecords implements Records {
     public Iterator<LogEntry> iterator() {
         if (writable) {
             // flip on a duplicate buffer for reading
-            return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), CompressionType.NONE, false);
+            return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), false);
         } else {
             // do not need to flip for non-writable buffer
-            return new RecordsIterator(this.buffer.duplicate(), CompressionType.NONE, false);
+            return new RecordsIterator(this.buffer.duplicate(), false);
         }
     }
     
@@ -224,11 +224,11 @@ public class MemoryRecords implements Records {
         private final ArrayDeque<LogEntry> logEntries;
         private final long absoluteBaseOffset;
 
-        public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
-            this.type = type;
+        public RecordsIterator(ByteBuffer buffer, boolean shallow) {
+            this.type = CompressionType.NONE;
             this.buffer = buffer;
             this.shallow = shallow;
-            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+            this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
             this.logEntries = null;
             this.absoluteBaseOffset = -1;
         }
@@ -238,7 +238,7 @@ public class MemoryRecords implements Records {
             this.type = entry.record().compressionType();
             this.buffer = entry.record().value();
             this.shallow = true;
-            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
             long wrapperRecordOffset = entry.offset();
             // If relative offset is used, we need to decompress the entire message first to compute
             // the absolute offset.

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
new file mode 100644
index 0000000..37877ef
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class KafkaLZ4Test {
+
+    private final boolean useBrokenFlagDescriptorChecksum;
+    private final boolean ignoreFlagDescriptorChecksum;
+    private final byte[] payload;
+
+    public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload) {
+        this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
+        this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+        this.payload = payload;
+    }
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        byte[] payload = new byte[1000];
+        Arrays.fill(payload, (byte) 1);
+        List<Object[]> values = new ArrayList<Object[]>();
+        for (boolean broken : Arrays.asList(false, true))
+            for (boolean ignore : Arrays.asList(false, true))
+                values.add(new Object[] {broken, ignore, payload});
+        return values;
+    }
+
+    @Test
+    public void testKafkaLZ4() throws IOException {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(output, this.useBrokenFlagDescriptorChecksum);
+        lz4.write(this.payload, 0, this.payload.length);
+        lz4.flush();
+        byte[] compressed = output.toByteArray();
+
+        // Check magic bytes stored as little-endian
+        int offset = 0;
+        assertEquals(compressed[offset++], 0x04);
+        assertEquals(compressed[offset++], 0x22);
+        assertEquals(compressed[offset++], 0x4D);
+        assertEquals(compressed[offset++], 0x18);
+
+        // Check flg descriptor
+        byte flg = compressed[offset++];
+
+        // 2-bit version must be 01
+        int version = (flg >>> 6) & 3;
+        assertEquals(version, 1);
+
+        // Reserved bits should always be 0
+        int reserved = flg & 3;
+        assertEquals(reserved, 0);
+
+        // Check block descriptor
+        byte bd = compressed[offset++];
+
+        // Block max-size
+        int blockMaxSize = (bd >>> 4) & 7;
+        // Only supported values are 4 (64KB), 5 (256KB), 6 (1MB), 7 (4MB)
+        assertTrue(blockMaxSize >= 4);
+        assertTrue(blockMaxSize <= 7);
+
+        // Multiple reserved bit ranges in block descriptor
+        reserved = bd & 15;
+        assertEquals(reserved, 0);
+        reserved = (bd >>> 7) & 1;
+        assertEquals(reserved, 0);
+
+        // If flg descriptor sets content size flag
+        // there are 8 additional bytes before checksum
+        boolean contentSize = ((flg >>> 3) & 1) != 0;
+        if (contentSize)
+            offset += 8;
+
+        // Checksum applies to frame descriptor: flg, bd, and optional contentsize
+        // so initial offset should be 4 (for magic bytes)
+        int off = 4;
+        int len = offset - 4;
+
+        // Initial implementation of checksum incorrectly applied to full header
+        // including magic bytes
+        if (this.useBrokenFlagDescriptorChecksum) {
+            off = 0;
+            len = offset;
+        }
+
+        int hash = XXHashFactory.fastestInstance().hash32().hash(compressed, off, len, 0);
+
+        byte hc = compressed[offset++];
+        assertEquals(hc, (byte) ((hash >> 8) & 0xFF));
+
+        ByteArrayInputStream input = new ByteArrayInputStream(compressed);
+        try {
+            KafkaLZ4BlockInputStream decompressed = new KafkaLZ4BlockInputStream(input, this.ignoreFlagDescriptorChecksum);
+            byte[] testPayload = new byte[this.payload.length];
+            int ret = decompressed.read(testPayload, 0, this.payload.length);
+            assertEquals(ret, this.payload.length);
+            assertArrayEquals(this.payload, testPayload);
+        } catch (IOException e) {
+            assertTrue(this.useBrokenFlagDescriptorChecksum && !this.ignoreFlagDescriptorChecksum);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index e23234b..0f742f9 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -499,7 +499,7 @@ private[log] class Cleaner(val id: Int,
       val timestampType = firstMessageOffset.message.timestampType
       val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
       messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream =>
-        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
+        val output = new DataOutputStream(CompressionFactory(compressionCodec, messageFormatVersion, outputStream))
         try {
           for (messageOffset <- messageAndOffsets) {
             val message = messageOffset.message

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 6f38715..677355a 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 
+import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
@@ -55,7 +56,7 @@ object ByteBufferMessageSet {
       var offset = -1L
       val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
       messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
-        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
+        val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
         try {
           for (message <- messages) {
             offset = offsetAssigner.nextAbsoluteOffset()
@@ -95,7 +96,7 @@ object ByteBufferMessageSet {
       if (wrapperMessage.payload == null)
         throw new KafkaException(s"Message payload is null: $wrapperMessage")
       val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
-      val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream))
+      val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
       var lastInnerOffset = -1L
 
       val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
@@ -107,7 +108,7 @@ object ByteBufferMessageSet {
           case eofe: EOFException =>
             compressed.close()
           case ioe: IOException =>
-            throw new KafkaException(ioe)
+            throw new CorruptRecordException(ioe)
         }
         Some(innerMessageAndOffsets)
       } else None

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index b047f68..e02ed63 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOu
 
 object CompressionFactory {
   
-  def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = {
+  def apply(compressionCodec: CompressionCodec, messageVersion: Byte, stream: OutputStream): OutputStream = {
     compressionCodec match {
       case DefaultCompressionCodec => new GZIPOutputStream(stream)
       case GZIPCompressionCodec => new GZIPOutputStream(stream)
@@ -34,13 +34,13 @@ object CompressionFactory {
         import org.xerial.snappy.SnappyOutputStream
         new SnappyOutputStream(stream)
       case LZ4CompressionCodec =>
-        new KafkaLZ4BlockOutputStream(stream)
+        new KafkaLZ4BlockOutputStream(stream, messageVersion == Message.MagicValue_V0)
       case _ =>
         throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
     }
   }
   
-  def apply(compressionCodec: CompressionCodec, stream: InputStream): InputStream = {
+  def apply(compressionCodec: CompressionCodec, messageVersion: Byte, stream: InputStream): InputStream = {
     compressionCodec match {
       case DefaultCompressionCodec => new GZIPInputStream(stream)
       case GZIPCompressionCodec => new GZIPInputStream(stream)
@@ -48,7 +48,7 @@ object CompressionFactory {
         import org.xerial.snappy.SnappyInputStream
         new SnappyInputStream(stream)
       case LZ4CompressionCodec =>
-        new KafkaLZ4BlockInputStream(stream)
+        new KafkaLZ4BlockInputStream(stream, messageVersion == Message.MagicValue_V0)
       case _ =>
         throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 53b85ef..1438523 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -17,6 +17,9 @@
 
 package kafka.message
 
+import org.apache.kafka.common.record._
+
+import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 import scala.collection._
 import org.scalatest.junit.JUnitSuite
@@ -26,6 +29,24 @@ import org.junit.Assert._
 class MessageCompressionTest extends JUnitSuite {
 
   @Test
+  def testLZ4FramingV0() {
+    val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayOutputStream())
+    assertTrue(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
+
+    val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A)))
+    assertTrue(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
+  }
+
+  @Test
+  def testLZ4FramingV1() {
+    val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayOutputStream())
+    assertFalse(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
+
+    val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126)))
+    assertFalse(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
+  }
+
+  @Test
   def testSimpleCompressDecompress() {
     val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)
     if(isSnappyAvailable)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
index 6f0ee1d..a82a553 100644
--- a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
@@ -36,7 +36,7 @@ class MessageWriterTest extends JUnitSuite {
   private def mkMessageWithWriter(key: Array[Byte] = null, bytes: Array[Byte], codec: CompressionCodec): Message = {
     val writer = new MessageWriter(100)
     writer.write(key = key, codec = codec, timestamp = Message.NoTimestamp, timestampType = TimestampType.CREATE_TIME, magicValue = Message.MagicValue_V1) { output =>
-      val out = if (codec == NoCompressionCodec) output else CompressionFactory(codec, output)
+      val out = if (codec == NoCompressionCodec) output else CompressionFactory(codec, Message.MagicValue_V1, output)
       try {
         val p = rnd.nextInt(bytes.length)
         out.write(bytes, 0, p)
@@ -53,14 +53,14 @@ class MessageWriterTest extends JUnitSuite {
 
   private def compress(bytes: Array[Byte], codec: CompressionCodec): Array[Byte] = {
     val baos = new ByteArrayOutputStream()
-    val out = CompressionFactory(codec, baos)
+    val out = CompressionFactory(codec, Message.MagicValue_V1, baos)
     out.write(bytes)
     out.close()
     baos.toByteArray
   }
 
   private def decompress(compressed: Array[Byte], codec: CompressionCodec): Array[Byte] = {
-    toArray(CompressionFactory(codec, new ByteArrayInputStream(compressed)))
+    toArray(CompressionFactory(codec, Message.MagicValue_V1, new ByteArrayInputStream(compressed)))
   }
 
   private def toArray(in: InputStream): Array[Byte] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fe25522/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b9c4bec..486954c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -80,6 +80,11 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9
     <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li>
     <li> The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0. </li>
     <li> The new consumer has standardized its APIs to accept <code>java.util.Collection</code> as the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library. </li>
+    <li> LZ4-compressed message handling was changed to use an interoperable framing specification (LZ4f v1.5.1).
+         To maintain compatibility with old clients, this change only applies to Message format 0.10.0 and later.
+         Clients that Produce/Fetch LZ4-compressed messages using v0/v1 (Message format 0.9.0) should continue
+         to use the 0.9.0 framing implementation. Clients that use Produce/Fetch protocols v2 or later
+         should use interoperable LZ4f framing. A list of interoperable LZ4 libraries is available at http://www.lz4.org/
 </ul>
 
 <h5><a id="upgrade_10_notable" href="#upgrade_10_notable">Notable changes in 0.10.0.0</a></h5>