You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/27 13:56:23 UTC

[incubator-doris] branch master updated: [feature] add zstd compression codec (#9747)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new efdb3b79a5 [feature] add zstd compression codec (#9747)
efdb3b79a5 is described below

commit efdb3b79a55c6cc27bfc5bb02e5c5e46894abde8
Author: Kang <kx...@gmail.com>
AuthorDate: Fri May 27 21:56:18 2022 +0800

    [feature] add zstd compression codec (#9747)
    
    ZSTD compression is fast with high compression ratio. It can be used to archive higher compression ratio
    than default Lz4f codec for storing cost sensitive data such as logs.
    
    Compared to Lz4f codec, we see zstd codec get 35% compressed size off, 30% faster at first time read without OS page
    cache, 40% slower at second time read with OS page cache in the following comparison test.
    
    test data: 25GB text log, 110 million rows
    test table: test_table(ts varchar(30), log string)
    test SQL: set enable_vectorized_engine=1; select sum(length(log)) from test_table
    be.conf: disable_storage_page_cache = true
    set this config to disable doris page cache to avoid all data cached in memory for test real decompression speed.
    test result
    
    master branch with lz4f codec result:
    - compressed size 4.3G
    - SQL first exec time(read data from disk + decompress + little computation) : 18.3s
    - SQL second exec time(read data from OS pagecache + decompress + little computation) : 2.4s
    
    this branch with zstd codec (hardcode enable it) result:
    - compressed size: 2.8G
    - SQL first exec time: 12.8s
    - SQL second exec time: 3.4s
---
 be/src/util/block_compression.cpp       | 149 ++++++++++++++++++++++++++++++++
 be/test/util/block_compression_test.cpp |   2 +
 2 files changed, 151 insertions(+)

diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp
index a1ee74f047..51f485f348 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -22,6 +22,8 @@
 #include <snappy/snappy-sinksource.h>
 #include <snappy/snappy.h>
 #include <zlib.h>
+#include <zstd.h>
+#include <zstd_errors.h>
 
 #include <limits>
 
@@ -375,6 +377,150 @@ public:
     }
 };
 
+// for ZSTD compression and decompression, with BOTH fast and high compression ratio
+class ZstdBlockCompression : public BlockCompressionCodec {
+public:
+    // reenterable initialization for compress/decompress context
+    inline Status init() override {
+        if (!ctx_c) {
+            ctx_c = ZSTD_createCCtx();
+            if (!ctx_c) {
+                return Status::InvalidArgument("Fail to ZSTD_createCCtx");
+            }
+        }
+
+        if (!ctx_d) {
+            ctx_d = ZSTD_createDCtx();
+            if (!ctx_d) {
+                return Status::InvalidArgument("Fail to ZSTD_createDCtx");
+            }
+        }
+
+        return Status::OK();
+    }
+
+    ~ZstdBlockCompression() override {
+        if (ctx_c) ZSTD_freeCCtx(ctx_c);
+        if (ctx_d) ZSTD_freeDCtx(ctx_d);
+    }
+
+    size_t max_compressed_len(size_t len) const override {
+        if (len > std::numeric_limits<int32_t>::max()) {
+            return 0;
+        }
+        return ZSTD_compressBound(len);
+    }
+
+    Status compress(const Slice& input, Slice* output) const override {
+        std::vector<Slice> inputs {input};
+        return compress(inputs, output);
+    }
+
+    // follow ZSTD official example
+    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
+    Status compress(const std::vector<Slice>& inputs, Slice* output) const {
+        if (!ctx_c) return Status::InvalidArgument("compression context NOT initialized");
+
+        // reset ctx to start new compress session
+        auto ret = ZSTD_CCtx_reset(ctx_c, ZSTD_reset_session_only);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "ZSTD_CCtx_reset error: $0", ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+        // set compression level to default 3
+        ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(
+                    strings::Substitute("ZSTD_CCtx_setParameter compression level error: $0",
+                                        ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+        // set checksum flag to 1
+        ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_checksumFlag, 1);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(
+                    strings::Substitute("ZSTD_CCtx_setParameter checksumFlag error: $0",
+                                        ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+
+        ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+        for (size_t i = 0; i < inputs.size(); i++) {
+            ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
+
+            bool last_input = (i == inputs.size() - 1);
+            auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
+
+            bool finished = false;
+            do {
+                // do compress
+                auto ret = ZSTD_compressStream2(ctx_c, &out_buf, &in_buf, mode);
+
+                if (ZSTD_isError(ret)) {
+                    return Status::InvalidArgument(
+                            strings::Substitute("ZSTD_compressStream2 error: $0",
+                                                ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+                }
+
+                // ret is ZSTD hint for needed output buffer size
+                if (ret > 0 && out_buf.pos == out_buf.size) {
+                    return Status::InvalidArgument(
+                            strings::Substitute("ZSTD_compressStream2 output buffer full"));
+                }
+
+                finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
+            } while (!finished);
+        }
+
+        // set compressed size for caller
+        output->size = out_buf.pos;
+
+        return Status::OK();
+    }
+
+    // follow ZSTD official example
+    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c
+    Status decompress(const Slice& input, Slice* output) const {
+        if (!ctx_d) return Status::InvalidArgument("decompression context NOT initialized");
+
+        // reset ctx to start a new decompress session
+        auto ret = ZSTD_DCtx_reset(ctx_d, ZSTD_reset_session_only);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "ZSTD_DCtx_reset error: $0", ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+
+        ZSTD_inBuffer in_buf = {input.data, input.size, 0};
+        ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+        while (in_buf.pos < in_buf.size) {
+            // do decompress
+            auto ret = ZSTD_decompressStream(ctx_d, &out_buf, &in_buf);
+
+            if (ZSTD_isError(ret)) {
+                return Status::InvalidArgument(
+                        strings::Substitute("ZSTD_decompressStream error: $0",
+                                            ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+            }
+
+            // ret is ZSTD hint for needed output buffer size
+            if (ret > 0 && out_buf.pos == out_buf.size) {
+                return Status::InvalidArgument(
+                        strings::Substitute("ZSTD_decompressStream output buffer full"));
+            }
+        }
+
+        // set decompressed size for caller
+        output->size = out_buf.pos;
+
+        return Status::OK();
+    }
+
+private:
+    // will be reused by compress/decompress
+    ZSTD_CCtx* ctx_c = nullptr;
+    ZSTD_DCtx* ctx_d = nullptr;
+};
+
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
                                    std::unique_ptr<BlockCompressionCodec>& codec) {
     BlockCompressionCodec* ptr = nullptr;
@@ -394,6 +540,9 @@ Status get_block_compression_codec(segment_v2::CompressionTypePB type,
     case segment_v2::CompressionTypePB::ZLIB:
         ptr = new ZlibBlockCompression();
         break;
+    case segment_v2::CompressionTypePB::ZSTD:
+        ptr = new ZstdBlockCompression();
+        break;
     default:
         return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
     }
diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp
index 0bf62e6d9d..b1ca291f78 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -101,6 +101,7 @@ TEST_F(BlockCompressionTest, single) {
     test_single_slice(segment_v2::CompressionTypePB::ZLIB);
     test_single_slice(segment_v2::CompressionTypePB::LZ4);
     test_single_slice(segment_v2::CompressionTypePB::LZ4F);
+    test_single_slice(segment_v2::CompressionTypePB::ZSTD);
 }
 
 void test_multi_slices(segment_v2::CompressionTypePB type) {
@@ -156,6 +157,7 @@ TEST_F(BlockCompressionTest, multi) {
     test_multi_slices(segment_v2::CompressionTypePB::ZLIB);
     test_multi_slices(segment_v2::CompressionTypePB::LZ4);
     test_multi_slices(segment_v2::CompressionTypePB::LZ4F);
+    test_multi_slices(segment_v2::CompressionTypePB::ZSTD);
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org