You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2019/02/22 23:11:07 UTC

[cassandra] branch trunk updated: Add Zstd compressor

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dccf530  Add Zstd compressor
dccf530 is described below

commit dccf53061a61e7c632669c60cd94626e405518e9
Author: Dinesh Joshi <di...@apple.com>
AuthorDate: Mon Nov 19 15:43:50 2018 -0800

    Add Zstd compressor
    
    Patch by Dinesh Joshi; Reviewed by Blake Eggleston for CASSANDRA-14482
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |   4 +-
 doc/source/architecture/storage_engine.rst         |   2 +-
 doc/source/operating/compression.rst               |   5 +-
 lib/licenses/zstd-jni-1.3.8-3.txt                  |  26 +++
 lib/zstd-jni-1.3.8-5.jar                           | Bin 0 -> 3944916 bytes
 .../cassandra/io/compress/ZstdCompressor.java      | 239 +++++++++++++++++++++
 .../apache/cassandra/schema/CompressionParams.java |  12 +-
 ...ression.yaml => commitlog_compression_LZ4.yaml} |   0
 test/conf/commitlog_compression_Zstd.yaml          |   2 +
 .../io/compress/CompressorPerformance.java         |  12 +-
 .../cassandra/db/RecoveryManagerFlushedTest.java   |   4 +-
 .../db/RecoveryManagerMissingHeaderTest.java       |   4 +-
 .../apache/cassandra/db/RecoveryManagerTest.java   |   4 +-
 .../cassandra/db/RecoveryManagerTruncateTest.java  |   4 +-
 .../cassandra/db/commitlog/CommitLogTest.java      |   4 +-
 .../cassandra/db/commitlog/SegmentReaderTest.java  |   7 +
 .../cassandra/hints/HintsCompressionTest.java      |   9 +
 .../cassandra/io/compress/CQLCompressionTest.java  |  25 +++
 .../compress/CompressedSequentialWriterTest.java   |   7 +
 .../cassandra/io/compress/CompressorTest.java      |  10 +-
 .../cassandra/io/compress/ZstdCompressorTest.java  |  53 +++++
 22 files changed, 421 insertions(+), 13 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 87d815b..0283be6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add Zstd compressor (CASSANDRA-14482)
  * Fix IR prepare anti-compaction race (CASSANDRA-15027)
  * Fix SimpleStrategy option validation (CASSANDRA-15007)
  * Don't try to cancel 2i compactions when starting anticompaction (CASSANDRA-15024)
diff --git a/build.xml b/build.xml
index 5488f38..efcc9a9 100644
--- a/build.xml
+++ b/build.xml
@@ -64,6 +64,7 @@
     <property name="test.burn.src" value="${test.dir}/burn"/>
     <property name="test.microbench.src" value="${test.dir}/microbench"/>
     <property name="test.distributed.src" value="${test.dir}/distributed"/>
+    <property name="test.compression_algo" value="LZ4"/>
     <property name="dist.dir" value="${build.dir}/dist"/>
     <property name="tmp.dir" value="${java.io.tmpdir}"/>
 
@@ -412,6 +413,7 @@
           <dependency groupId="org.xerial.snappy" artifactId="snappy-java" version="1.1.2.6"/>
           <dependency groupId="org.lz4" artifactId="lz4-java" version="1.4.0"/>
           <dependency groupId="com.ning" artifactId="compress-lzf" version="0.8.4"/>
+          <dependency groupId="com.github.luben" artifactId="zstd-jni" version="1.3.8-5"/>
           <dependency groupId="com.google.guava" artifactId="guava" version="23.3-jre"/>
           <dependency groupId="org.hdrhistogram" artifactId="HdrHistogram" version="2.1.9"/>
           <dependency groupId="commons-cli" artifactId="commons-cli" version="1.1"/>
@@ -1483,7 +1485,7 @@
     <property name="compressed_yaml" value="${build.test.dir}/cassandra.compressed.yaml"/>
     <concat destfile="${compressed_yaml}">
       <fileset file="${test.conf}/cassandra.yaml"/>
-      <fileset file="${test.conf}/commitlog_compression.yaml"/>
+      <fileset file="${test.conf}/commitlog_compression_${test.compression_algo}.yaml"/>
     </concat>
     <path id="all-test-classes-path">
       <fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
diff --git a/doc/source/architecture/storage_engine.rst b/doc/source/architecture/storage_engine.rst
index 2a95d93..23b738d 100644
--- a/doc/source/architecture/storage_engine.rst
+++ b/doc/source/architecture/storage_engine.rst
@@ -55,7 +55,7 @@ Commitlogs are an append only log of all mutations local to a Cassandra node. An
 
 *Default Value:* /var/lib/cassandra/commitlog
 
-- ``commitlog_compression``: Compression to apply to the commitlog. If omitted, the commit log will be written uncompressed.  LZ4, Snappy, and Deflate compressors are supported.
+- ``commitlog_compression``: Compression to apply to the commitlog. If omitted, the commit log will be written uncompressed. LZ4, Snappy, Deflate and Zstd compressors are supported.
 
 (Default Value: (complex option)::
 
diff --git a/doc/source/operating/compression.rst b/doc/source/operating/compression.rst
index 01da34b..42a057b 100644
--- a/doc/source/operating/compression.rst
+++ b/doc/source/operating/compression.rst
@@ -33,11 +33,12 @@ Configuring Compression
 Compression is configured on a per-table basis as an optional argument to ``CREATE TABLE`` or ``ALTER TABLE``. By
 default, three options are relevant:
 
-- ``class`` specifies the compression class - Cassandra provides three classes (``LZ4Compressor``,
-  ``SnappyCompressor``, and ``DeflateCompressor`` ). The default is ``LZ4Compressor``.
+- ``class`` specifies the compression class - Cassandra provides four classes (``LZ4Compressor``,
+  ``SnappyCompressor``, ``DeflateCompressor`` and ``ZstdCompressor``). The default is ``LZ4Compressor``.
 - ``chunk_length_in_kb`` specifies the number of kilobytes of data per compression chunk. The default is 64KB.
 - ``crc_check_chance`` determines how likely Cassandra is to verify the checksum on each compression chunk during
   reads. The default is 1.0.
+- ``compression_level`` is only applicable for ``ZstdCompressor`` and accepts values between ``-131072`` and ``2``.
 
 Users can set compression using the following syntax:
 
diff --git a/lib/licenses/zstd-jni-1.3.8-3.txt b/lib/licenses/zstd-jni-1.3.8-3.txt
new file mode 100644
index 0000000..66abb8a
--- /dev/null
+++ b/lib/licenses/zstd-jni-1.3.8-3.txt
@@ -0,0 +1,26 @@
+Zstd-jni: JNI bindings to Zstd Library
+
+Copyright (c) 2015-present, Luben Karavelov/ All rights reserved.
+
+BSD License
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice, this
+  list of conditions and the following disclaimer in the documentation and/or
+  other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/lib/zstd-jni-1.3.8-5.jar b/lib/zstd-jni-1.3.8-5.jar
new file mode 100644
index 0000000..f68ce66
Binary files /dev/null and b/lib/zstd-jni-1.3.8-5.jar differ
diff --git a/src/java/org/apache/cassandra/io/compress/ZstdCompressor.java b/src/java/org/apache/cassandra/io/compress/ZstdCompressor.java
new file mode 100644
index 0000000..1fd7415
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/ZstdCompressor.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.luben.zstd.Zstd;
+
+/**
+ * ZSTD Compressor
+ */
+public class ZstdCompressor implements ICompressor
+{
+    private static final Logger logger = LoggerFactory.getLogger(ZstdCompressor.class);
+
+    // These might change with the version of Zstd we're using
+    public static final int FAST_COMPRESSION_LEVEL = Zstd.minCompressionLevel();
+    public static final int BEST_COMPRESSION_LEVEL = Zstd.maxCompressionLevel();
+
+    // Compressor Defaults
+    public static final int DEFAULT_COMPRESSION_LEVEL = 3;
+    private static final boolean ENABLE_CHECKSUM_FLAG = true;
+
+    @VisibleForTesting
+    public static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level";
+
+    private static final ConcurrentHashMap<Integer, ZstdCompressor> instances = new ConcurrentHashMap<>();
+
+    private final int compressionLevel;
+
+    /**
+     * Create a Zstd compressor with the given options
+     *
+     * @param options
+     * @return
+     */
+    public static ZstdCompressor create(Map<String, String> options)
+    {
+        int level = getOrDefaultCompressionLevel(options);
+
+        if (!isValid(level))
+            throw new IllegalArgumentException(String.format("%s=%d is invalid", COMPRESSION_LEVEL_OPTION_NAME, level));
+
+        return getOrCreate(level);
+    }
+
+    /**
+     * Private constructor
+     *
+     * @param compressionLevel
+     */
+    private ZstdCompressor(int compressionLevel)
+    {
+        this.compressionLevel = compressionLevel;
+        logger.trace("Creating Zstd Compressor with compression level={}", compressionLevel);
+    }
+
+    /**
+     * Get a cached instance or return a new one
+     *
+     * @param level
+     * @return
+     */
+    public static ZstdCompressor getOrCreate(int level)
+    {
+        return instances.computeIfAbsent(level, l -> new ZstdCompressor(level));
+    }
+
+    /**
+     * Get initial compressed buffer length
+     *
+     * @param chunkLength
+     * @return
+     */
+    @Override
+    public int initialCompressedBufferLength(int chunkLength)
+    {
+        return (int) Zstd.compressBound(chunkLength);
+    }
+
+    /**
+     * Decompress data using arrays
+     *
+     * @param input
+     * @param inputOffset
+     * @param inputLength
+     * @param output
+     * @param outputOffset
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset)
+    throws IOException
+    {
+        long dsz = Zstd.decompressByteArray(output, outputOffset, output.length - outputOffset,
+                                            input, inputOffset, inputLength);
+
+        if (Zstd.isError(dsz))
+            throw new IOException(String.format("Decompression failed due to %s", Zstd.getErrorName(dsz)));
+
+        return (int) dsz;
+    }
+
+    /**
+     * Decompress data via ByteBuffers
+     *
+     * @param input
+     * @param output
+     * @throws IOException
+     */
+    @Override
+    public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        try
+        {
+            Zstd.decompress(output, input);
+        } catch (Exception e)
+        {
+            throw new IOException("Decompression failed", e);
+        }
+    }
+
+    /**
+     * Compress using ByteBuffers
+     *
+     * @param input
+     * @param output
+     * @throws IOException
+     */
+    @Override
+    public void compress(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        try
+        {
+            Zstd.compress(output, input, compressionLevel, ENABLE_CHECKSUM_FLAG);
+        } catch (Exception e)
+        {
+            throw new IOException("Compression failed", e);
+        }
+    }
+
+    /**
+     * Check if the given compression level is valid. This can be a negative value as well.
+     *
+     * @param level
+     * @return
+     */
+    private static boolean isValid(int level)
+    {
+        return (level >= FAST_COMPRESSION_LEVEL && level <= BEST_COMPRESSION_LEVEL);
+    }
+
+    /**
+     * Parse the compression options
+     *
+     * @param options
+     * @return
+     */
+    private static int getOrDefaultCompressionLevel(Map<String, String> options)
+    {
+        if (options == null)
+            return DEFAULT_COMPRESSION_LEVEL;
+
+        String val = options.get(COMPRESSION_LEVEL_OPTION_NAME);
+
+        if (val == null)
+            return DEFAULT_COMPRESSION_LEVEL;
+
+        return Integer.valueOf(val);
+    }
+
+    /**
+     * Return the preferred BufferType
+     *
+     * @return
+     */
+    @Override
+    public BufferType preferredBufferType()
+    {
+        return BufferType.OFF_HEAP;
+    }
+
+    /**
+     * Check whether the given BufferType is supported
+     *
+     * @param bufferType
+     * @return
+     */
+    @Override
+    public boolean supports(BufferType bufferType)
+    {
+        return bufferType == BufferType.OFF_HEAP;
+    }
+
+    /**
+     * Lists the supported options by this compressor
+     *
+     * @return
+     */
+    @Override
+    public Set<String> supportedOptions()
+    {
+        return new HashSet<>(Collections.singletonList(COMPRESSION_LEVEL_OPTION_NAME));
+    }
+
+
+    @VisibleForTesting
+    public int getCompressionLevel()
+    {
+        return compressionLevel;
+    }
+}
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
index 2563111..40a4be3 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -29,7 +29,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import org.slf4j.Logger;
@@ -179,6 +178,17 @@ public final class CompressionParams
         return new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), chunkLength, maxCompressedLength, calcMinCompressRatio(chunkLength, maxCompressedLength), Collections.emptyMap());
     }
 
+    public static CompressionParams zstd()
+    {
+        return zstd(DEFAULT_CHUNK_LENGTH);
+    }
+
+    public static CompressionParams zstd(Integer chunkLength)
+    {
+        ZstdCompressor compressor = ZstdCompressor.create(Collections.emptyMap());
+        return new CompressionParams(compressor, chunkLength, Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap());
+    }
+
     public CompressionParams(String sstableCompressorClass, Map<String, String> otherOptions, int chunkLength, double minCompressRatio) throws ConfigurationException
     {
         this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, calcMaxCompressedLength(chunkLength, minCompressRatio), minCompressRatio, otherOptions);
diff --git a/test/conf/commitlog_compression.yaml b/test/conf/commitlog_compression_LZ4.yaml
similarity index 100%
rename from test/conf/commitlog_compression.yaml
rename to test/conf/commitlog_compression_LZ4.yaml
diff --git a/test/conf/commitlog_compression_Zstd.yaml b/test/conf/commitlog_compression_Zstd.yaml
new file mode 100644
index 0000000..0c440ae
--- /dev/null
+++ b/test/conf/commitlog_compression_Zstd.yaml
@@ -0,0 +1,2 @@
+commitlog_compression:
+    - class_name: ZstdCompressor
diff --git a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
index e703839..b3cdaa1 100644
--- a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
+++ b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
@@ -35,7 +35,9 @@ public class CompressorPerformance
                 SnappyCompressor.instance,  // warm up
                 DeflateCompressor.instance,
                 LZ4Compressor.create(Collections.emptyMap()),
-                SnappyCompressor.instance
+                SnappyCompressor.instance,
+                ZstdCompressor.getOrCreate(ZstdCompressor.FAST_COMPRESSION_LEVEL),
+                ZstdCompressor.getOrCreate(ZstdCompressor.DEFAULT_COMPRESSION_LEVEL)
         })
         {
             for (BufferType in: BufferType.values())
@@ -70,10 +72,15 @@ public class CompressorPerformance
         int count = 100;
 
         long time = System.nanoTime();
+        long uncompressedBytes = 0;
+        long compressedBytes = 0;
         for (int i=0; i<count; ++i)
         {
             output.clear();
             compressor.compress(dataSource, output);
+            uncompressedBytes += dataSource.limit();
+            compressedBytes += output.position();
+
             // Make sure not optimized away.
             checksum += output.get(ThreadLocalRandom.current().nextInt(output.position()));
             dataSource.rewind();
@@ -93,7 +100,7 @@ public class CompressorPerformance
             input.rewind();
         }
         long timed = System.nanoTime() - time;
-        System.out.format("Compressor %s %s->%s compress %.3f ns/b %.3f mb/s uncompress %.3f ns/b %.3f mb/s.%s\n",
+        System.out.format("Compressor %s %s->%s compress %.3f ns/b %.3f mb/s uncompress %.3f ns/b %.3f mb/s ratio %.2f:1.%s\n",
                           compressor.getClass().getSimpleName(),
                           in,
                           out,
@@ -101,6 +108,7 @@ public class CompressorPerformance
                           Math.scalb(1.0e9, -20) * count * len / timec,
                           1.0 * timed / (count * len),
                           Math.scalb(1.0e9, -20) * count * len / timed,
+                          ((double) uncompressedBytes) / ((double) compressedBytes),
                           checksum == 0 ? " " : "");
     }
 
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index 7225560..fc34942 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.io.compress.ZstdCompressor;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -71,7 +72,8 @@ public class RecoveryManagerFlushedTest
             {null, EncryptionContextGenerator.createContext(true)}, // Encryption
             {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
             {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+            {new ParameterizedClass(ZstdCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
     }
 
     @Before
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index d60574f..4044fff 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.security.EncryptionContext;
@@ -70,7 +71,8 @@ public class RecoveryManagerMissingHeaderTest
             {null, EncryptionContextGenerator.createContext(true)}, // Encryption
             {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
             {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+            {new ParameterizedClass(ZstdCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
     }
 
     @Before
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 164253d..7e397c1 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -55,6 +55,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.security.EncryptionContext;
@@ -89,7 +90,8 @@ public class RecoveryManagerTest
             {null, EncryptionContextGenerator.createContext(true)}, // Encryption
             {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
             {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+            {new ParameterizedClass(ZstdCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
     }
 
     @Before
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index ef00b22..a51cd21 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.EncryptionContextGenerator;
@@ -68,7 +69,8 @@ public class RecoveryManagerTruncateTest
             {null, EncryptionContextGenerator.createContext(true)}, // Encryption
             {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
             {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+            {new ParameterizedClass(ZstdCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
     }
 
     @Before
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index e1bd96c..25e2f30 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -38,6 +38,7 @@ import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.io.compress.ZstdCompressor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -98,7 +99,8 @@ public abstract class CommitLogTest
             {null, EncryptionContextGenerator.createContext(true)}, // Encryption
             {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
             {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+            {new ParameterizedClass(ZstdCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
     }
 
     public static void beforeClass() throws ConfigurationException
diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
index 47e1afa..ce20935 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
@@ -78,6 +79,12 @@ public class SegmentReaderTest
         compressedSegmenter(DeflateCompressor.create(null));
     }
 
+    @Test
+    public void compressedSegmenter_Zstd() throws IOException
+    {
+        compressedSegmenter(ZstdCompressor.create(Collections.emptyMap()));
+    }
+
     private void compressedSegmenter(ICompressor compressor) throws IOException
     {
         int rawSize = (1 << 15) - 137;
diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
index f82db49..faa8d27 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
 
 public class HintsCompressionTest extends AlteredHints
 {
@@ -77,4 +78,12 @@ public class HintsCompressionTest extends AlteredHints
         compressorClass = DeflateCompressor.class;
         multiFlushAndDeserializeTest();
     }
+
+    @Test
+    public void zstdCompressor() throws Exception
+    {
+        compressorClass = ZstdCompressor.class;
+        multiFlushAndDeserializeTest();
+    }
+
 }
diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
index 1efccd3..d4b0b7b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
@@ -53,4 +53,29 @@ public class CQLCompressionTest extends CQLTester
             throw e.getCause();
         }
     }
+
+    @Test
+    public void zstdParamsTest()
+    {
+        createTable("create table %s (id int primary key, uh text) with compression = {'class':'ZstdCompressor', 'compression_level':-22}");
+        assertTrue(((ZstdCompressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).getClass().equals(ZstdCompressor.class));
+        assertEquals(((ZstdCompressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).getCompressionLevel(), -22);
+
+        createTable("create table %s (id int primary key, uh text) with compression = {'class':'ZstdCompressor'}");
+        assertTrue(((ZstdCompressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).getClass().equals(ZstdCompressor.class));
+        assertEquals(((ZstdCompressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).getCompressionLevel(), ZstdCompressor.DEFAULT_COMPRESSION_LEVEL);
+    }
+
+    @Test(expected = ConfigurationException.class)
+    public void zstdBadParamsTest() throws Throwable
+    {
+        try
+        {
+            createTable("create table %s (id int primary key, uh text) with compression = {'class':'ZstdCompressor', 'compression_level':'100'}");
+        }
+        catch (RuntimeException e)
+        {
+            throw e.getCause();
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 557bc32..a965410 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -97,6 +97,13 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
         runTests("Snappy");
     }
 
+    @Test
+    public void testZSTDWriter() throws IOException
+    {
+        compressionParameters = CompressionParams.zstd();
+        runTests("ZSTD");
+    }
+
     private void testWrite(File f, int bytesToTest, boolean useMemmap) throws IOException
     {
         final String filename = f.getAbsolutePath();
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index 1d37ad4..b649f52 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -42,7 +42,8 @@ public class CompressorTest
     ICompressor[] compressors = new ICompressor[] {
             LZ4Compressor.create(Collections.<String, String>emptyMap()),
             DeflateCompressor.create(Collections.<String, String>emptyMap()),
-            SnappyCompressor.create(Collections.<String, String>emptyMap())
+            SnappyCompressor.create(Collections.<String, String>emptyMap()),
+            ZstdCompressor.create(Collections.emptyMap())
     };
 
     @Test
@@ -177,6 +178,13 @@ public class CompressorTest
         testByteBuffers();
     }
 
+    @Test
+    public void testZstdByteBuffers() throws IOException
+    {
+        compressor = ZstdCompressor.create(Collections.<String, String>emptyMap());
+        testByteBuffers();
+    }
+
     private void testByteBuffers() throws IOException
     {
         assert compressor.supports(BufferType.OFF_HEAP);
diff --git a/test/unit/org/apache/cassandra/io/compress/ZstdCompressorTest.java b/test/unit/org/apache/cassandra/io/compress/ZstdCompressorTest.java
new file mode 100644
index 0000000..70e32ad
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/ZstdCompressorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import com.github.luben.zstd.Zstd;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Zstd Compressor specific tests. General compressor tests are in {@link CompressorTest}
+ */
+public class ZstdCompressorTest
+{
+    @Test
+    public void emptyConfigurationUsesDefaultCompressionLevel()
+    {
+        ZstdCompressor compressor = ZstdCompressor.create(Collections.emptyMap());
+        assertEquals(ZstdCompressor.DEFAULT_COMPRESSION_LEVEL, compressor.getCompressionLevel());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badCompressionLevelParamThrowsExceptionMin()
+    {
+        ZstdCompressor.create(ImmutableMap.of(ZstdCompressor.COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(Zstd.minCompressionLevel() - 1)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badCompressionLevelParamThrowsExceptionMax()
+    {
+        ZstdCompressor.create(ImmutableMap.of(ZstdCompressor.COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(Zstd.maxCompressionLevel() + 1)));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org