You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jo...@apache.org on 2020/04/24 06:28:39 UTC

[cassandra] branch trunk updated: Flush with fast compressors by default

This is an automated email from the ASF dual-hosted git repository.

jolynch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9c1bbf3  Flush with fast compressors by default
9c1bbf3 is described below

commit 9c1bbf3ac913f9bdf7a0e0922106804af42d2c1e
Author: Joseph Lynch <jo...@gmail.com>
AuthorDate: Sun Nov 3 16:37:18 2019 -0800

    Flush with fast compressors by default
    
    Previously Zstd, Deflate and LZ4HC could hold up the flush thread while
    flushing due to their slow compression rates. With this patch we always
    default to flush with a fast compressor (default to LZ4) and then
    compress into the high compression strategy during normal compactions
    that follow. Note that if the existing table compressor is fast (e.g.
    LZ4 or Snappy) we use that.
    
    Patch by Joey Lynch; reviewed by Dinesh Joshi for CASSANDRA-15379
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  15 ++
 doc/source/operating/compression.rst               |  95 ++++++++--
 pylib/cqlshlib/cqlhandling.py                      |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   9 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../cassandra/io/compress/DeflateCompressor.java   |  10 ++
 .../apache/cassandra/io/compress/ICompressor.java  |  26 +++
 .../cassandra/io/compress/LZ4Compressor.java       |  16 ++
 .../cassandra/io/compress/NoopCompressor.java      |  80 +++++++++
 .../cassandra/io/compress/ZstdCompressor.java      |   9 +
 .../io/sstable/format/big/BigTableWriter.java      |  46 ++++-
 .../apache/cassandra/schema/CompressionParams.java |  15 ++
 .../config/DatabaseDescriptorRefTest.java          |   4 +-
 .../cassandra/io/compress/CQLCompressionTest.java  | 194 ++++++++++++++++++++-
 .../compress/CompressedSequentialWriterTest.java   |   7 +
 .../cassandra/io/compress/CompressorTest.java      |  10 +-
 17 files changed, 530 insertions(+), 18 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ab8a7eb..b9c8f8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Flush with fast compressors by default (CASSANDRA-15379)
  * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637)
  * Allow sending Entire SSTables over SSL (CASSANDRA-15740)
  * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index f1e5864..e1d2d2d 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -425,6 +425,21 @@ commitlog_segment_size_in_mb: 32
 #     parameters:
 #         -
 
+# Compression to apply to SSTables as they flush for compressed tables.
+# Note that tables without compression enabled do not respect this flag.
+#
+# As high ratio compressors like LZ4HC, Zstd, and Deflate can potentially
+# block flushes for too long, the default is to flush with a known fast
+# compressor in those cases. Options are:
+#
+# none : Flush without compressing blocks but while still doing checksums.
+# fast : Flush with a fast compressor. If the table is already using a
+#        fast compressor that compressor is used.
+# table: Always flush with the same compressor that the table uses. This
+#        was the pre 4.0 behavior.
+#
+# flush_compression: fast
+
 # any class that implements the SeedProvider interface and has a
 # constructor that takes a Map<String, String> of parameters will do.
 seed_provider:
diff --git a/doc/source/operating/compression.rst b/doc/source/operating/compression.rst
index b4308b3..74c992f 100644
--- a/doc/source/operating/compression.rst
+++ b/doc/source/operating/compression.rst
@@ -20,27 +20,88 @@ Compression
 -----------
 
 Cassandra offers operators the ability to configure compression on a per-table basis. Compression reduces the size of
-data on disk by compressing the SSTable in user-configurable compression ``chunk_length_in_kb``. Because Cassandra
-SSTables are immutable, the CPU cost of compressing is only necessary when the SSTable is written - subsequent updates
+data on disk by compressing the SSTable in user-configurable compression ``chunk_length_in_kb``. As Cassandra SSTables
+are immutable, the CPU cost of compressing is only necessary when the SSTable is written - subsequent updates
 to data will land in different SSTables, so Cassandra will not need to decompress, overwrite, and recompress data when
 UPDATE commands are issued. On reads, Cassandra will locate the relevant compressed chunks on disk, decompress the full
 chunk, and then proceed with the remainder of the read path (merging data from disks and memtables, read repair, and so
 on).
 
+Compression algorithms typically trade off between the following three areas:
+
+- **Compression speed**: How fast does the compression algorithm compress data. This is critical in the flush and
+  compaction paths because data must be compressed before it is written to disk.
+- **Decompression speed**: How fast does the compression algorithm de-compress data. This is critical in the read
+  and compaction paths as data must be read off disk in a full chunk and decompressed before it can be returned.
+- **Ratio**: By what ratio is the uncompressed data reduced by. Cassandra typically measures this as the size of data
+  on disk relative to the uncompressed size. For example a ratio of ``0.5`` means that the data on disk is 50% the size
+  of the uncompressed data. Cassandra exposes this ratio per table as the ``SSTable Compression Ratio`` field of
+  ``nodetool tablestats``.
+
+Cassandra offers five compression algorithms by default that make different tradeoffs in these areas. While
+benchmarking compression algorithms depends on many factors (algorithm parameters such as compression level,
+the compressibility of the input data, underlying processor class, etc ...), the following table should help you pick
+a starting point based on your application's requirements with an extremely rough grading of the different choices
+by their performance in these areas (A is relatively good, F is relatively bad):
+
++---------------------------------------------+-----------------------+-------------+---------------+-------+-------------+
+| Compression Algorithm                       | Cassandra Class       | Compression | Decompression | Ratio | C* Version  |
++=============================================+=======================+=============+===============+=======+=============+
+| `LZ4 <https://lz4.github.io/lz4/>`_         | ``LZ4Compressor``     |          A+ |            A+ |    C+ | ``>=1.2.2`` |
++---------------------------------------------+-----------------------+-------------+---------------+-------+-------------+
+| `LZ4HC <https://lz4.github.io/lz4/>`_       | ``LZ4Compressor``     |          C+ |            A+ |    B+ | ``>= 3.6``  |
++---------------------------------------------+-----------------------+-------------+---------------+-------+-------------+
+| `Zstd <https://facebook.github.io/zstd/>`_  | ``ZstdCompressor``    |          A- |            A- |    A+ | ``>= 4.0``  |
++---------------------------------------------+-----------------------+-------------+---------------+-------+-------------+
+| `Snappy <http://google.github.io/snappy/>`_ | ``SnappyCompressor``  |          A- |            A  |     C | ``>= 1.0``  |
++---------------------------------------------+-----------------------+-------------+---------------+-------+-------------+
+| `Deflate (zlib) <https://zlib.net>`_        | ``DeflateCompressor`` |          C  |            C  |     A | ``>= 1.0``  |
++---------------------------------------------+-----------------------+-------------+---------------+-------+-------------+
+
+Generally speaking for a performance critical (latency or throughput) application ``LZ4`` is the right choice as it
+gets excellent ratio per CPU cycle spent. This is why it is the default choice in Cassandra.
+
+For storage critical applications (disk footprint), however, ``Zstd`` may be a better choice as it can get significant
+additional ratio to ``LZ4``.
+
+``Snappy`` is kept for backwards compatibility and ``LZ4`` will typically be preferable.
+
+``Deflate`` is kept for backwards compatibility and ``Zstd`` will typically be preferable.
+
 Configuring Compression
 ^^^^^^^^^^^^^^^^^^^^^^^
 
-Compression is configured on a per-table basis as an optional argument to ``CREATE TABLE`` or ``ALTER TABLE``. By
-default, three options are relevant:
+Compression is configured on a per-table basis as an optional argument to ``CREATE TABLE`` or ``ALTER TABLE``. Three
+options are available for all compressors:
+
+- ``class`` (default: ``LZ4Compressor``): specifies the compression class to use. The two "fast"
+  compressors are ``LZ4Compressor`` and ``SnappyCompressor`` and the two "good" ratio compressors are ``ZstdCompressor``
+  and ``DeflateCompressor``.
+- ``chunk_length_in_kb`` (default: ``16KiB``): specifies the number of kilobytes of data per compression chunk. The main
+  tradeoff here is that larger chunk sizes give compression algorithms more context and improve their ratio, but
+  require reads to deserialize and read more off disk.
+- ``crc_check_chance`` (default: ``1.0``): determines how likely Cassandra is to verify the checksum on each compression
+  chunk during reads to protect against data corruption. Unless you have profiles indicating this is a performance
+  problem it is highly encouraged not to turn this off as it is Cassandra's only protection against bitrot.
+
+The ``LZ4Compressor`` supports the following additional options:
+
+- ``lz4_compressor_type`` (default ``fast``): specifies if we should use the ``high`` (a.k.a ``LZ4HC``) ratio version
+  or the ``fast`` (a.k.a ``LZ4``) version of ``LZ4``. The ``high`` mode supports a configurable level, which can allow
+  operators to tune the performance <-> ratio tradeoff via the ``lz4_high_compressor_level`` option. Note that in
+  ``4.0`` and above it may be preferable to use the ``Zstd`` compressor.
+- ``lz4_high_compressor_level`` (default ``9``): A number between ``1`` and ``17`` inclusive that represents how much
+  CPU time to spend trying to get more compression ratio. Generally lower levels are "faster" but they get less ratio
+  and higher levels are slower but get more compression ratio.
+
+The ``ZstdCompressor`` supports the following options in addition:
+
+- ``compression_level`` (default ``3``): A number between ``-131072`` and ``22`` inclusive that represents how much CPU
+  time to spend trying to get more compression ratio. The lower the level, the faster the speed (at the cost of ratio).
+  Values from 20 to 22 are called "ultra levels" and should be used with caution, as they require more memory.
+  The default of ``3`` is a good choice for competing with ``Deflate`` ratios and ``1`` is a good choice for competing
+  with ``LZ4``.
 
-- ``class`` specifies the compression class - Cassandra provides four classes (``LZ4Compressor``,
-  ``SnappyCompressor``, ``DeflateCompressor`` and ``ZstdCompressor``). The default is ``LZ4Compressor``.
-- ``chunk_length_in_kb`` specifies the number of kilobytes of data per compression chunk. The default is 64KB.
-- ``crc_check_chance`` determines how likely Cassandra is to verify the checksum on each compression chunk during
-  reads. The default is 1.0.
-- ``compression_level`` is only applicable for ``ZstdCompressor`` and accepts values between ``-131072`` and ``22``.
-    The lower the level, the faster the speed (at the cost of compression). Values from 20 to 22 are called
-    "ultra levels" and should be used with caution, as they require more memory. The default is 3.
 
 Users can set compression using the following syntax:
 
@@ -52,7 +113,7 @@ Or
 
 ::
 
-    ALTER TABLE keyspace.table WITH compression = {'class': 'SnappyCompressor', 'chunk_length_in_kb': 128, 'crc_check_chance': 0.5};
+    ALTER TABLE keyspace.table WITH compression = {'class': 'LZ4Compressor', 'chunk_length_in_kb': 64, 'crc_check_chance': 0.5};
 
 Once enabled, compression can be disabled with ``ALTER TABLE`` setting ``enabled`` to ``false``:
 
@@ -75,7 +136,8 @@ in storage requirements, it often increases read and write throughput, as the CP
 than the time it would take to read or write the larger volume of uncompressed data from disk.
 
 Compression is most useful in tables comprised of many rows, where the rows are similar in nature. Tables containing
-similar text columns (such as repeated JSON blobs) often compress very well.
+similar text columns (such as repeated JSON blobs) often compress very well. Tables containing data that has already
+been compressed or random data (e.g. benchmark datasets) do not typically compress well.
 
 Operational Impact
 ^^^^^^^^^^^^^^^^^^
@@ -86,6 +148,11 @@ Operational Impact
 - Streaming operations involve compressing and decompressing data on compressed tables - in some code paths (such as
   non-vnode bootstrap), the CPU overhead of compression can be a limiting factor.
 
+- To prevent slow compressors (``Zstd``, ``Deflate``, ``LZ4HC``) from blocking flushes for too long, all three
+  flush with the default fast ``LZ4`` compressor and then rely on normal compaction to re-compress the data into the
+  desired compression strategy. See `CASSANDRA-15379 <https://issues.apache.org/jira/browse/CASSANDRA-15379>` for more
+  details.
+
 - The compression path checksums data to ensure correctness - while the traditional Cassandra read path does not have a
   way to ensure correctness of data on disk, compressed tables allow the user to set ``crc_check_chance`` (a float from
   0.0 to 1.0) to allow Cassandra to probabilistically validate chunks on read to verify bits on disk are not corrupt.
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index 9305dfa..08d5828 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -31,6 +31,7 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
         'DeflateCompressor',
         'SnappyCompressor',
         'LZ4Compressor',
+        'ZstdCompressor',
     )
 
     available_compaction_classes = (
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 957662a..3525715 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -245,6 +245,7 @@ public class Config
     public int commitlog_sync_period_in_ms;
     public int commitlog_segment_size_in_mb = 32;
     public ParameterizedClass commitlog_compression;
+    public FlushCompression flush_compression = FlushCompression.fast;
     public int commitlog_max_compression_buffers_in_pool = 3;
     public Integer periodic_commitlog_sync_lag_block_in_ms;
     public TransparentDataEncryptionOptions transparent_data_encryption_options = new TransparentDataEncryptionOptions();
@@ -521,6 +522,14 @@ public class Config
         batch,
         group
     }
+
+    public enum FlushCompression
+    {
+        none,
+        fast,
+        table
+    }
+
     public enum InternodeCompression
     {
         all, none, dc
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d4ee34b..698fb45 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1822,6 +1822,16 @@ public class DatabaseDescriptor
         conf.commitlog_compression = compressor;
     }
 
+    public static Config.FlushCompression getFlushCompression()
+    {
+        return conf.flush_compression;
+    }
+
+    public static void setFlushCompression(Config.FlushCompression compression)
+    {
+        conf.flush_compression = compression;
+    }
+
    /**
     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 8557f5f..d3d7090 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -29,6 +29,8 @@ import java.util.zip.DataFormatException;
 import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
+import com.google.common.collect.ImmutableSet;
+
 public class DeflateCompressor implements ICompressor
 {
     public static final DeflateCompressor instance = new DeflateCompressor();
@@ -49,6 +51,7 @@ public class DeflateCompressor implements ICompressor
 
     private final FastThreadLocal<Deflater> deflater;
     private final FastThreadLocal<Inflater> inflater;
+    private final Set<Uses> recommendedUses;
 
     public static DeflateCompressor create(Map<String, String> compressionOptions)
     {
@@ -74,6 +77,7 @@ public class DeflateCompressor implements ICompressor
                 return new Inflater();
             }
         };
+        recommendedUses = ImmutableSet.of(Uses.GENERAL);
     }
 
     public Set<String> supportedOptions()
@@ -226,4 +230,10 @@ public class DeflateCompressor implements ICompressor
         // Prefer array-backed buffers.
         return BufferType.ON_HEAP;
     }
+
+    @Override
+    public Set<Uses> recommendedUses()
+    {
+        return recommendedUses;
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index 40dc7c2..fd6a104 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -19,10 +19,24 @@ package org.apache.cassandra.io.compress;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
+
 public interface ICompressor
 {
+    /**
+     * Ways that a particular instance of ICompressor should be used internally in Cassandra.
+     *
+     * GENERAL: Suitable for general use
+     * FAST_COMPRESSION: Suitable for use in particularly latency sensitive compression situations (flushes).
+     */
+    enum Uses {
+        GENERAL,
+        FAST_COMPRESSION
+    }
+
     public int initialCompressedBufferLength(int chunkLength);
 
     public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException;
@@ -57,4 +71,16 @@ public interface ICompressor
     public boolean supports(BufferType bufferType);
 
     public Set<String> supportedOptions();
+
+    /**
+     * Hints to Cassandra which uses this compressor is recommended for. For example a compression algorithm which gets
+     * good compression ratio may trade off too much compression speed to be useful in certain compression heavy use
+     * cases such as flushes or mutation hints.
+     *
+     * Note that Cassandra may ignore these recommendations, it is not a strict contract.
+     */
+    default Set<Uses> recommendedUses()
+    {
+        return ImmutableSet.copyOf(EnumSet.allOf(Uses.class));
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 1b3844d..30ec8ba 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -20,12 +20,14 @@ package org.apache.cassandra.io.compress;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +65,10 @@ public class LZ4Compressor implements ICompressor
         {
             if (compressorType.equals(LZ4_FAST_COMPRESSOR) && args.get(LZ4_HIGH_COMPRESSION_LEVEL) != null)
                 logger.warn("'{}' parameter is ignored when '{}' is '{}'", LZ4_HIGH_COMPRESSION_LEVEL, LZ4_COMPRESSOR_TYPE, LZ4_FAST_COMPRESSOR);
+            if (compressorType.equals(LZ4_HIGH_COMPRESSOR))
+                logger.info("The ZstdCompressor may be preferable to LZ4 in 'high' mode. Zstd will typically " +
+                            "compress much faster while achieving better ratio, but it may decompress more slowly,");
+
             instance = new LZ4Compressor(compressorType, compressionLevel);
             LZ4Compressor instanceFromMap = instances.putIfAbsent(compressorTypeAndLevel, instance);
             if(instanceFromMap != null)
@@ -77,6 +83,7 @@ public class LZ4Compressor implements ICompressor
     final String compressorType;
     @VisibleForTesting
     final Integer compressionLevel;
+    private final Set<Uses> recommendedUses;
 
     private LZ4Compressor(String type, Integer compressionLevel)
     {
@@ -88,12 +95,15 @@ public class LZ4Compressor implements ICompressor
             case LZ4_HIGH_COMPRESSOR:
             {
                 compressor = lz4Factory.highCompressor(compressionLevel);
+                // LZ4HC can be _extremely_ slow to compress, up to 10x slower
+                this.recommendedUses = ImmutableSet.of(Uses.GENERAL);
                 break;
             }
             case LZ4_FAST_COMPRESSOR:
             default:
             {
                 compressor = lz4Factory.fastCompressor();
+                this.recommendedUses = ImmutableSet.copyOf(EnumSet.allOf(Uses.class));
             }
         }
 
@@ -231,4 +241,10 @@ public class LZ4Compressor implements ICompressor
     {
         return true;
     }
+
+    @Override
+    public Set<Uses> recommendedUses()
+    {
+        return recommendedUses;
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/compress/NoopCompressor.java b/src/java/org/apache/cassandra/io/compress/NoopCompressor.java
new file mode 100644
index 0000000..b6307ea
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/NoopCompressor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A Compressor which doesn't actually compress any data. This is useful for either non-compressible data
+ * (typically already compressed or encrypted) that you still want block checksums for or for fast writing.
+ * Some relevant tickets:
+ * <p>
+ *     <ul>
+ *         <li>CASSANDRA-12682: Non compressed SSTables can silently corrupt data</li>
+ *         <li>CASSANDRA-9264: Non compressed SSTables are written without checksums</li>
+ *     </ul>
+ * </p>
+ */
+public class NoopCompressor implements ICompressor
+{
+    public static NoopCompressor create(Map<String, String> ignored)
+    {
+        return new NoopCompressor();
+    }
+
+    public int initialCompressedBufferLength(int chunkLength)
+    {
+        return chunkLength;
+    }
+
+    public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
+    {
+        System.arraycopy(input, inputOffset, output, outputOffset, inputLength);
+        return inputLength;
+    }
+
+    public void compress(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        output.put(input);
+    }
+
+    public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        output.put(input);
+    }
+
+    public BufferType preferredBufferType()
+    {
+        return BufferType.ON_HEAP;
+    }
+
+    public boolean supports(BufferType bufferType)
+    {
+        return true;
+    }
+
+    public Set<String> supportedOptions()
+    {
+        return Collections.emptySet();
+    }
+}
diff --git a/src/java/org/apache/cassandra/io/compress/ZstdCompressor.java b/src/java/org/apache/cassandra/io/compress/ZstdCompressor.java
index 1fd7415..c86db26 100644
--- a/src/java/org/apache/cassandra/io/compress/ZstdCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ZstdCompressor.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,7 @@ public class ZstdCompressor implements ICompressor
     private static final ConcurrentHashMap<Integer, ZstdCompressor> instances = new ConcurrentHashMap<>();
 
     private final int compressionLevel;
+    private final Set<Uses> recommendedUses;
 
     /**
      * Create a Zstd compressor with the given options
@@ -78,6 +80,7 @@ public class ZstdCompressor implements ICompressor
     private ZstdCompressor(int compressionLevel)
     {
         this.compressionLevel = compressionLevel;
+        this.recommendedUses = ImmutableSet.of(Uses.GENERAL);
         logger.trace("Creating Zstd Compressor with compression level={}", compressionLevel);
     }
 
@@ -236,4 +239,10 @@ public class ZstdCompressor implements ICompressor
     {
         return compressionLevel;
     }
+
+    @Override
+    public Set<Uses> recommendedUses()
+    {
+        return recommendedUses;
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 70f568d..04ab08a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -42,6 +44,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -80,11 +83,13 @@ public class BigTableWriter extends SSTableWriter
 
         if (compression)
         {
+            final CompressionParams compressionParams = compressionFor(lifecycleNewTracker.opType());
+
             dataFile = new CompressedSequentialWriter(new File(getFilename()),
                                              descriptor.filenameFor(Component.COMPRESSION_INFO),
                                              new File(descriptor.filenameFor(Component.DIGEST)),
                                              writerOption,
-                                             metadata().params.compression,
+                                             compressionParams,
                                              metadataCollector);
         }
         else
@@ -102,6 +107,45 @@ public class BigTableWriter extends SSTableWriter
         columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, getRowIndexEntrySerializer().indexInfoSerializer());
     }
 
+    /**
+     * Given an OpType, determine the correct Compression Parameters
+     * @param opType
+     * @return {@link org.apache.cassandra.schema.CompressionParams}
+     */
+    private CompressionParams compressionFor(final OperationType opType)
+    {
+        CompressionParams compressionParams = metadata().params.compression;
+        final ICompressor compressor = compressionParams.getSstableCompressor();
+
+        if (null != compressor && opType == OperationType.FLUSH)
+        {
+            // When we are flushing out of the memtable throughput of the compressor is critical as flushes,
+            // especially of large tables, can queue up and potentially block writes.
+            // This optimization allows us to fall back to a faster compressor if a particular
+            // compression algorithm indicates we should. See CASSANDRA-15379 for more details.
+            switch (DatabaseDescriptor.getFlushCompression())
+            {
+                // It is relatively easier to insert a Noop compressor than to disable compressed writing
+                // entirely as the "compression" member field is provided outside the scope of this class.
+                // It may make sense in the future to refactor the ownership of the compression flag so that
+                // We can bypass the CompressedSequentialWriter in this case entirely.
+                case none:
+                    compressionParams = CompressionParams.NOOP;
+                    break;
+                case fast:
+                    if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
+                    {
+                        // The default compressor is generally fast (LZ4 with 16KiB block size)
+                        compressionParams = CompressionParams.DEFAULT;
+                        break;
+                    }
+                case table:
+                default:
+            }
+        }
+        return compressionParams;
+    }
+
     public void mark()
     {
         dataMark = dataFile.mark();
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
index 21bea74..53760a9 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.schema;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -71,6 +72,13 @@ public final class CompressionParams
                                                                           DEFAULT_MIN_COMPRESS_RATIO,
                                                                           Collections.emptyMap());
 
+    public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()),
+                                                                       // 4 KiB is often the underlying disk block size
+                                                                       1024 * 4,
+                                                                       Integer.MAX_VALUE,
+                                                                       DEFAULT_MIN_COMPRESS_RATIO,
+                                                                       Collections.emptyMap());
+
     private static final String CRC_CHECK_CHANCE_WARNING = "The option crc_check_chance was deprecated as a compression option. " +
                                                            "You should specify it as a top-level table option instead";
 
@@ -189,6 +197,13 @@ public final class CompressionParams
         return new CompressionParams(compressor, chunkLength, Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap());
     }
 
+    @VisibleForTesting
+    public static CompressionParams noop()
+    {
+        NoopCompressor compressor = NoopCompressor.create(Collections.emptyMap());
+        return new CompressionParams(compressor, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap());
+    }
+
     public CompressionParams(String sstableCompressorClass, Map<String, String> otherOptions, int chunkLength, double minCompressRatio) throws ConfigurationException
     {
         this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, calcMaxCompressedLength(chunkLength, minCompressRatio), minCompressRatio, otherOptions);
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index e19ec53..c43d622 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -69,10 +69,11 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.Config",
     "org.apache.cassandra.config.Config$1",
     "org.apache.cassandra.config.Config$CommitLogSync",
+    "org.apache.cassandra.config.Config$CommitFailurePolicy",
     "org.apache.cassandra.config.Config$DiskAccessMode",
     "org.apache.cassandra.config.Config$DiskFailurePolicy",
-    "org.apache.cassandra.config.Config$CommitFailurePolicy",
     "org.apache.cassandra.config.Config$DiskOptimizationStrategy",
+    "org.apache.cassandra.config.Config$FlushCompression",
     "org.apache.cassandra.config.Config$InternodeCompression",
     "org.apache.cassandra.config.Config$MemtableAllocationType",
     "org.apache.cassandra.config.Config$RepairCommandPoolFullStrategy",
@@ -126,6 +127,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.FSWriteError",
     "org.apache.cassandra.io.FSError",
     "org.apache.cassandra.io.compress.ICompressor",
+    "org.apache.cassandra.io.compress.ICompressor$Uses",
     "org.apache.cassandra.io.compress.LZ4Compressor",
     "org.apache.cassandra.io.sstable.metadata.MetadataType",
     "org.apache.cassandra.io.util.BufferedDataOutputStreamPlus",
diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
index d4b0b7b..108c70f 100644
--- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
@@ -18,21 +18,45 @@
 
 package org.apache.cassandra.io.compress;
 
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class CQLCompressionTest extends CQLTester
 {
+    private static Config.FlushCompression defaultFlush;
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        CQLTester.setUpClass();
+
+        defaultFlush = DatabaseDescriptor.getFlushCompression();
+    }
+
+    @Before
+    public void resetDefaultFlush()
+    {
+        DatabaseDescriptor.setFlushCompression(defaultFlush);
+    }
+
     @Test
     public void lz4ParamsTest()
     {
         createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor', 'lz4_high_compressor_level':3}");
-        assertTrue(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressorType.equals(LZ4Compressor.LZ4_FAST_COMPRESSOR));
+        assertEquals(((LZ4Compressor) getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressorType, LZ4Compressor.LZ4_FAST_COMPRESSOR);
         createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor', 'lz4_compressor_type':'high', 'lz4_high_compressor_level':13}");
         assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressorType, LZ4Compressor.LZ4_HIGH_COMPRESSOR);
         assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata().params.compression.getSstableCompressor()).compressionLevel, (Integer)13);
@@ -78,4 +102,172 @@ public class CQLCompressionTest extends CQLTester
             throw e.getCause();
         }
     }
+
+    @Test
+    public void lz4FlushTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'LZ4Compressor'};");
+        ColumnFamilyStore store = flushTwice();
+
+        // Should flush as LZ4 "fast"
+        Set<SSTableReader> sstables = store.getLiveSSTables();
+        sstables.forEach(sstable -> {
+            LZ4Compressor compressor = (LZ4Compressor) sstable.getCompressionMetadata().parameters.getSstableCompressor();
+            assertEquals(LZ4Compressor.LZ4_FAST_COMPRESSOR, compressor.compressorType);
+        });
+
+        // Should compact to LZ4 "fast"
+        compact();
+
+        sstables = store.getLiveSSTables();
+        assertEquals(sstables.size(), 1);
+        store.getLiveSSTables().forEach(sstable -> {
+            LZ4Compressor compressor = (LZ4Compressor) sstable.getCompressionMetadata().parameters.getSstableCompressor();
+            assertEquals(LZ4Compressor.LZ4_FAST_COMPRESSOR, compressor.compressorType);
+        });
+    }
+
+    @Test
+    public void lz4hcFlushTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = " +
+                    "{'sstable_compression': 'LZ4Compressor', 'lz4_compressor_type': 'high'};");
+        ColumnFamilyStore store = flushTwice();
+
+        // Should flush as LZ4 "fast" mode
+        Set<SSTableReader> sstables = store.getLiveSSTables();
+        sstables.forEach(sstable -> {
+            LZ4Compressor compressor = (LZ4Compressor) sstable.getCompressionMetadata().parameters.getSstableCompressor();
+            assertEquals(LZ4Compressor.LZ4_FAST_COMPRESSOR, compressor.compressorType);
+        });
+
+        // Should compact to LZ4 "high" mode
+        compact();
+
+        sstables = store.getLiveSSTables();
+        assertEquals(sstables.size(), 1);
+        store.getLiveSSTables().forEach(sstable -> {
+            LZ4Compressor compressor = (LZ4Compressor) sstable.getCompressionMetadata().parameters.getSstableCompressor();
+            assertEquals(LZ4Compressor.LZ4_HIGH_COMPRESSOR, compressor.compressorType);
+        });
+    }
+
+    @Test
+    public void zstdFlushTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'ZstdCompressor'};");
+        ColumnFamilyStore store = flushTwice();
+
+        // Should flush as LZ4
+        Set<SSTableReader> sstables = store.getLiveSSTables();
+        sstables.forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor);
+        });
+
+        // Should compact to Zstd
+        compact();
+
+        sstables = store.getLiveSSTables();
+        assertEquals(sstables.size(), 1);
+        store.getLiveSSTables().forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof ZstdCompressor);
+        });
+    }
+
+    @Test
+    public void deflateFlushTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'DeflateCompressor'};");
+        ColumnFamilyStore store = flushTwice();
+
+        // Should flush as LZ4
+        Set<SSTableReader> sstables = store.getLiveSSTables();
+        sstables.forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor);
+        });
+
+        // Should compact to Deflate
+        compact();
+
+        sstables = store.getLiveSSTables();
+        assertEquals(sstables.size(), 1);
+        store.getLiveSSTables().forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof DeflateCompressor);
+        });
+    }
+
+    @Test
+    public void useNoCompressorOnFlushTest() throws Throwable
+    {
+        DatabaseDescriptor.setFlushCompression(Config.FlushCompression.none);
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'LZ4Compressor'};");
+        ColumnFamilyStore store = flushTwice();
+
+        // Should flush as Noop compressor
+        Set<SSTableReader> sstables = store.getLiveSSTables();
+        sstables.forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof NoopCompressor);
+        });
+
+        // Should compact to LZ4
+        compact();
+
+        sstables = store.getLiveSSTables();
+        assertEquals(sstables.size(), 1);
+        store.getLiveSSTables().forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor);
+        });
+    }
+
+    @Test
+    public void useTableCompressorOnFlushTest() throws Throwable
+    {
+        DatabaseDescriptor.setFlushCompression(Config.FlushCompression.table);
+
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'ZstdCompressor'};");
+        ColumnFamilyStore store = flushTwice();
+
+        // Should flush as Zstd
+        Set<SSTableReader> sstables = store.getLiveSSTables();
+        sstables.forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof ZstdCompressor);
+        });
+    }
+
+    @Test
+    public void zstdTableFlushTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'ZstdCompressor'};");
+        ColumnFamilyStore store = flushTwice();
+
+        // Should flush as LZ4
+        Set<SSTableReader> sstables = store.getLiveSSTables();
+        sstables.forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor);
+        });
+
+        // Should compact to Zstd
+        compact();
+
+        sstables = store.getLiveSSTables();
+        assertEquals(sstables.size(), 1);
+        store.getLiveSSTables().forEach(sstable -> {
+            assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof ZstdCompressor);
+        });
+    }
+
+    private ColumnFamilyStore flushTwice() throws Throwable
+    {
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        execute("INSERT INTO %s (k, v) values (?, ?)", "k1", "v1");
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+
+        execute("INSERT INTO %s (k, v) values (?, ?)", "k2", "v2");
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+
+        return cfs;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index a965410..57802cb 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -104,6 +104,13 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
         runTests("ZSTD");
     }
 
+    @Test
+    public void testNoopWriter() throws IOException
+    {
+        compressionParameters = CompressionParams.noop();
+        runTests("Noop");
+    }
+
     private void testWrite(File f, int bytesToTest, boolean useMemmap) throws IOException
     {
         final String filename = f.getAbsolutePath();
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index d45d941..29e8453 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -43,7 +43,8 @@ public class CompressorTest
             LZ4Compressor.create(Collections.<String, String>emptyMap()),
             DeflateCompressor.create(Collections.<String, String>emptyMap()),
             SnappyCompressor.create(Collections.<String, String>emptyMap()),
-            ZstdCompressor.create(Collections.emptyMap())
+            ZstdCompressor.create(Collections.emptyMap()),
+            NoopCompressor.create(Collections.emptyMap())
     };
 
     @Test
@@ -185,6 +186,13 @@ public class CompressorTest
         testByteBuffers();
     }
 
+    @Test
+    public void testNoopByteBuffers() throws IOException
+    {
+        compressor = NoopCompressor.create(Collections.emptyMap());
+        testByteBuffers();
+    }
+
     private void testByteBuffers() throws IOException
     {
         assert compressor.supports(BufferType.OFF_HEAP);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org