You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2020/02/11 17:07:28 UTC

[cassandra] branch trunk updated: Prevent the JVM from crashing due to corrupt LZ4 streams Patch by David Capwell; reviewed by Jordan West and brandonwilliams for CASSANDRA-15556

This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f80a2ce  Prevent the JVM from crashing due to corrupt LZ4 streams Patch by David Capwell; reviewed by Jordan West and brandonwilliams for CASSANDRA-15556
f80a2ce is described below

commit f80a2ce10ca18e43fc8767cc8801701231e68d29
Author: David Capwell <dc...@gmail.com>
AuthorDate: Thu Feb 6 13:34:37 2020 -0800

    Prevent the JVM from crashing due to corrupt LZ4 streams
    Patch by David Capwell; reviewed by Jordan West and brandonwilliams for
    CASSANDRA-15556
---
 build.xml                                          |   2 +-
 lib/lz4-java-1.4.0.jar                             | Bin 370119 -> 0 bytes
 lib/lz4-java-1.7.1.jar                             | Bin 0 -> 649950 bytes
 .../transport/frame/compress/LZ4Compressor.java    |  10 ++-
 .../data/CASSANDRA-15313/lz4-jvm-crash-failure.txt |   1 +
 .../microbench/ChecksummingTransformerLz4.java     |  95 +++++++++++++++++++++
 .../checksum/ChecksummingTransformerTest.java      |   4 +-
 ...ecksummingWithCorruptedLZ4DoesNotCrashTest.java |  80 +++++++++++++++++
 8 files changed, 185 insertions(+), 7 deletions(-)

diff --git a/build.xml b/build.xml
index 8ea7740..ecc60f3 100644
--- a/build.xml
+++ b/build.xml
@@ -509,7 +509,7 @@
         <scm connection="${scm.connection}" developerConnection="${scm.developerConnection}" url="${scm.url}"/>
         <dependencyManagement>
           <dependency groupId="org.xerial.snappy" artifactId="snappy-java" version="1.1.2.6"/>
-          <dependency groupId="org.lz4" artifactId="lz4-java" version="1.4.0"/>
+          <dependency groupId="org.lz4" artifactId="lz4-java" version="1.7.1"/>
           <dependency groupId="com.ning" artifactId="compress-lzf" version="0.8.4"/>
           <dependency groupId="com.github.luben" artifactId="zstd-jni" version="1.3.8-5"/>
           <dependency groupId="com.google.guava" artifactId="guava" version="27.0-jre"/>
diff --git a/lib/lz4-java-1.4.0.jar b/lib/lz4-java-1.4.0.jar
deleted file mode 100644
index 301908b..0000000
Binary files a/lib/lz4-java-1.4.0.jar and /dev/null differ
diff --git a/lib/lz4-java-1.7.1.jar b/lib/lz4-java-1.7.1.jar
new file mode 100644
index 0000000..95f57ca
Binary files /dev/null and b/lib/lz4-java-1.7.1.jar differ
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
index 8ac42e2..88633c6 100644
--- a/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
@@ -21,20 +21,20 @@ package org.apache.cassandra.transport.frame.compress;
 import java.io.IOException;
 
 import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
 
 public class LZ4Compressor implements Compressor
 {
     public static final LZ4Compressor INSTANCE = new LZ4Compressor();
 
     private final net.jpountz.lz4.LZ4Compressor compressor;
-    private final LZ4FastDecompressor decompressor;
+    private final LZ4SafeDecompressor decompressor;
 
     private LZ4Compressor()
     {
         final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
         compressor = lz4Factory.fastCompressor();
-        decompressor = lz4Factory.fastDecompressor();
+        decompressor = lz4Factory.safeDecompressor();
     }
 
     public int maxCompressedLength(int length)
@@ -58,7 +58,9 @@ public class LZ4Compressor implements Compressor
     {
         try
         {
-            return decompressor.decompress(src, offset, expectedDecompressedLength);
+            byte[] decompressed = new byte[expectedDecompressedLength];
+            decompressor.decompress(src, offset, length, decompressed, 0, expectedDecompressedLength);
+            return decompressed;
         }
         catch (Throwable t)
         {
diff --git a/test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt b/test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt
new file mode 100644
index 0000000..0f64d84
--- /dev/null
+++ b/test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt
@@ -0,0 +1 @@
+656465656364656365656464636364636563646563646365636563646465656564656363646364656565646563636463656465646464646565656365646365636564646465636563656464636563656363646364646465656363636364646364636464656464646565646363656563636365646363646365636563646464646463656564646365656565656365646465656463656564646563646365656563656465656465646463636563656364646364656565656464636563656363646363656363656563636464656365646364656565636463656465646565646563646363636464656563646565646464646563656565636365 [...]
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ChecksummingTransformerLz4.java b/test/microbench/org/apache/cassandra/test/microbench/ChecksummingTransformerLz4.java
new file mode 100644
index 0000000..7ac62fe
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/ChecksummingTransformerLz4.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Frame;
+import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.utils.ChecksumType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+@Warmup(iterations = 4, time = 1)
+@Measurement(iterations = 8, time = 2)
+@Fork(value = 2)
+@State(Scope.Benchmark)
+public class ChecksummingTransformerLz4
+{
+    private static final EnumSet<Frame.Header.Flag> FLAGS = EnumSet.of(Frame.Header.Flag.COMPRESSED, Frame.Header.Flag.CHECKSUMMED);
+
+    static {
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    private final ChecksummingTransformer transformer = ChecksummingTransformer.getTransformer(ChecksumType.CRC32, LZ4Compressor.INSTANCE);
+    private ByteBuf smallEnglishASCIICompressed;
+    private ByteBuf smallEnglishUtf8Compressed;
+    private ByteBuf largeBlobCompressed;
+
+    @Setup
+    public void setup() throws IOException
+    {
+        byte[] smallEnglishASCII = "this is small".getBytes(StandardCharsets.US_ASCII);
+        this.smallEnglishASCIICompressed = transformer.transformOutbound(Unpooled.wrappedBuffer(smallEnglishASCII));
+        byte[] smallEnglishUtf8 = "this is small".getBytes(StandardCharsets.UTF_8);
+        this.smallEnglishUtf8Compressed = transformer.transformOutbound(Unpooled.wrappedBuffer(smallEnglishUtf8));
+
+        String failureHex;
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt"), StandardCharsets.UTF_8))) {
+            failureHex = reader.readLine().trim();
+        }
+        byte[] failure = ByteBufUtil.decodeHexDump(failureHex);
+        this.largeBlobCompressed = transformer.transformOutbound(Unpooled.wrappedBuffer(failure));
+    }
+
+    @Benchmark
+    public ByteBuf decompresSsmallEnglishASCII() {
+        smallEnglishASCIICompressed.readerIndex(0);
+        return transformer.transformInbound(smallEnglishASCIICompressed, FLAGS);
+    }
+
+    @Benchmark
+    public ByteBuf decompresSsmallEnglishUtf8() {
+        smallEnglishUtf8Compressed.readerIndex(0);
+        return transformer.transformInbound(smallEnglishUtf8Compressed, FLAGS);
+    }
+
+    @Benchmark
+    public ByteBuf decompresLargeBlob() {
+        largeBlobCompressed.readerIndex(0);
+        return transformer.transformInbound(largeBlobCompressed, FLAGS);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
index d678044..5f5b10d 100644
--- a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
+++ b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
@@ -86,10 +86,10 @@ public class ChecksummingTransformerTest
                     integers().between(0, Byte.MAX_VALUE).map(Integer::byteValue),
                     compressors(),
                     checksumTypes())
-            .checkAssert(this::roundTripWithCorruption);
+            .checkAssert(ChecksummingTransformerTest::roundTripWithCorruption);
     }
 
-    private void roundTripWithCorruption(Pair<ReusableBuffer, Integer> inputAndCorruptablePosition,
+    static void roundTripWithCorruption(Pair<ReusableBuffer, Integer> inputAndCorruptablePosition,
                                          byte corruptionValue,
                                          Compressor compressor,
                                          ChecksumType checksum)
diff --git a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingWithCorruptedLZ4DoesNotCrashTest.java b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingWithCorruptedLZ4DoesNotCrashTest.java
new file mode 100644
index 0000000..4028bfd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingWithCorruptedLZ4DoesNotCrashTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.checksum;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBufUtil;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * When we use LZ4 with "fast" functions its unsafe in the case the stream is corrupt; for networking checksuming is
+ * after lz4 decompresses (see CASSANDRA-15299) which means that lz4 can crash the process.
+ *
+ * This test is stand alone for the reason that this test is known to cause the JVM to crash.  Given the way we run tests
+ * in CI this will kill the runner which means the file will be marked as failed; if this test was embedded into another
+ * test file then all the other tests would be ignored if this crashes.
+ */
+public class ChecksummingWithCorruptedLZ4DoesNotCrashTest
+{
+    @BeforeClass
+    public static void init()
+    {
+        // required as static ChecksummingTransformer instances read default block size from config
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    @Test
+    public void shouldNotCrash() throws IOException
+    {
+        // We found lz4 caused the JVM to crash, so used the input (bytes and byteToCorrupt) to the test which crashed
+        // to reproduce.
+        // It was found that the same input does not cause lz4 to crash by it self but needed repeated calls with this
+        // input produce such a failure.
+        String failureHex;
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("test/data/CASSANDRA-15313/lz4-jvm-crash-failure.txt"), StandardCharsets.UTF_8))) {
+            failureHex = reader.readLine().trim();
+        }
+        byte[] failure = ByteBufUtil.decodeHexDump(failureHex);
+        ReusableBuffer buffer = new ReusableBuffer(failure);
+        int byteToCorrupt = 52997;
+        // corrupting these values causes the exception.
+        byte[] corruptionValues = new byte[] { 21, 57, 79, (byte) 179 };
+        // 5k was chosen as the largest number of iterations seen needed to crash.
+        for (int i = 0; i < 5_000 ; i++) {
+            for (byte corruptionValue : corruptionValues) {
+                try {
+                    ChecksummingTransformerTest.roundTripWithCorruption(Pair.create(buffer, byteToCorrupt), corruptionValue, LZ4Compressor.INSTANCE, ChecksumType.ADLER32);
+                } catch (AssertionError e) {
+                    // ignore
+                }
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org