You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/15 13:18:39 UTC

[incubator-doris] branch master updated: [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e0c790094c [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
e0c790094c is described below

commit e0c790094c87325bd64649d8705e127730e9a728
Author: Kang <kx...@gmail.com>
AuthorDate: Sun May 15 21:18:32 2022 +0800

    [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
---
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  4 +-
 be/src/olap/rowset/segment_v2/column_reader.h      |  2 +-
 be/src/olap/rowset/segment_v2/column_writer.cpp    |  8 +-
 be/src/olap/rowset/segment_v2/column_writer.h      |  2 +-
 .../rowset/segment_v2/indexed_column_reader.cpp    |  4 +-
 .../olap/rowset/segment_v2/indexed_column_reader.h |  2 +-
 .../rowset/segment_v2/indexed_column_writer.cpp    |  6 +-
 .../olap/rowset/segment_v2/indexed_column_writer.h |  2 +-
 be/src/util/block_compression.cpp                  | 93 +++++++++++++---------
 be/src/util/block_compression.h                    |  4 +-
 be/test/util/block_compression_test.cpp            |  8 +-
 11 files changed, 77 insertions(+), 58 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index cc88552f9c..1b57d35269 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -104,7 +104,7 @@ Status ColumnReader::init() {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
 
     for (int i = 0; i < _meta.indexes_size(); i++) {
         auto& index_meta = _meta.indexes(i);
@@ -149,7 +149,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
     PageReadOptions opts;
     opts.rblock = iter_opts.rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec;
+    opts.codec = _compress_codec.get();
     opts.stats = iter_opts.stats;
     opts.verify_checksum = _opts.verify_checksum;
     opts.use_page_cache = iter_opts.use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index 305ba5f8dc..0e3f11c859 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -175,7 +175,7 @@ private:
             TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses.
     const EncodingInfo* _encoding_info =
             nullptr; // initialized in init(), used for create PageDecoder
-    const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()
+    std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()
 
     // meta for various column indexes (null if the index is absent)
     const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 5cb5140102..9a54b210c8 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -264,7 +264,7 @@ ScalarColumnWriter::~ScalarColumnWriter() {
 }
 
 Status ScalarColumnWriter::init() {
-    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec));
 
     PageBuilder* page_builder = nullptr;
 
@@ -420,7 +420,7 @@ Status ScalarColumnWriter::write_data() {
         footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING);
 
         PagePointer dict_pp;
-        RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+        RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
                                                         _opts.compression_min_space_saving, _wblock,
                                                         {dict_body.slice()}, footer, &dict_pp));
         dict_pp.to_proto(_opts.meta->mutable_dict_page());
@@ -508,8 +508,8 @@ Status ScalarColumnWriter::finish_current_page() {
     }
     // trying to compress page body
     OwnedSlice compressed_body;
-    RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
-                                               body, &compressed_body));
+    RETURN_IF_ERROR(PageIO::compress_page_body(
+            _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body));
     if (compressed_body.slice().empty()) {
         // page body is uncompressed
         page->data.emplace_back(std::move(encoded_values));
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h
index 0cacbf8547..2f50ebf075 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -249,7 +249,7 @@ private:
     PageHead _pages;
     ordinal_t _first_rowid = 0;
 
-    const BlockCompressionCodec* _compress_codec = nullptr;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
 
     std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
     std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 4ebe0d3af3..955c3b96e3 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
     _value_key_coder = get_key_coder(_type_info->type());
 
     std::unique_ptr<fs::ReadableBlock> rblock;
@@ -83,7 +83,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
     PageReadOptions opts;
     opts.rblock = rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec;
+    opts.codec = _compress_codec.get();
     OlapReaderStatistics tmp_stats;
     opts.stats = &tmp_stats;
     opts.use_page_cache = _use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
index 439c790e60..6663ac1077 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
@@ -84,7 +84,7 @@ private:
 
     const TypeInfo* _type_info = nullptr;
     const EncodingInfo* _encoding_info = nullptr;
-    const BlockCompressionCodec* _compress_codec = nullptr;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
     const KeyCoder* _value_key_coder = nullptr;
 };
 
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
index d195b9f2c4..4c7259c90c 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
@@ -72,7 +72,7 @@ Status IndexedColumnWriter::init() {
     }
 
     if (_options.compression != NO_COMPRESSION) {
-        RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec));
+        RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec));
     }
     return Status::OK();
 }
@@ -110,7 +110,7 @@ Status IndexedColumnWriter::_finish_current_data_page() {
     footer.mutable_data_page_footer()->set_num_values(num_values_in_page);
     footer.mutable_data_page_footer()->set_nullmap_size(0);
 
-    RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+    RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
                                                     _options.compression_min_space_saving, _wblock,
                                                     {page_body.slice()}, footer, &_last_data_page));
     _num_data_pages++;
@@ -159,7 +159,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM
 
         PagePointer pp;
         RETURN_IF_ERROR(PageIO::compress_and_write_page(
-                _compress_codec, _options.compression_min_space_saving, _wblock,
+                _compress_codec.get(), _options.compression_min_space_saving, _wblock,
                 {page_body.slice()}, page_footer, &pp));
 
         meta->set_is_root_data_page(false);
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
index fc222388c6..285ba890b2 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
@@ -108,7 +108,7 @@ private:
     std::unique_ptr<IndexPageBuilder> _value_index_builder;
     // encoder for value index's key
     const KeyCoder* _value_key_coder;
-    const BlockCompressionCodec* _compress_codec;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
 
     DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter);
 };
diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp
index 1b0f8143e2..a1ee74f047 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -88,46 +88,46 @@ public:
 // Used for LZ4 frame format, decompress speed is two times faster than LZ4.
 class Lz4fBlockCompression : public BlockCompressionCodec {
 public:
-    static const Lz4fBlockCompression* instance() {
-        static Lz4fBlockCompression s_instance;
-        return &s_instance;
-    }
-
-    ~Lz4fBlockCompression() override {}
+    Status init() override {
+        auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION);
+        if (LZ4F_isError(ret1)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1)));
+        }
+        ctx_c_inited = true;
 
-    Status compress(const Slice& input, Slice* output) const override {
-        auto compressed_len = LZ4F_compressFrame(output->data, output->size, input.data, input.size,
-                                                 &_s_preferences);
-        if (LZ4F_isError(compressed_len)) {
+        auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION);
+        if (LZ4F_isError(ret2)) {
             return Status::InvalidArgument(strings::Substitute(
-                    "Fail to do LZ4F compress frame, msg=$0", LZ4F_getErrorName(compressed_len)));
+                    "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2)));
         }
-        output->size = compressed_len;
+        ctx_d_inited = true;
+
         return Status::OK();
     }
 
+    ~Lz4fBlockCompression() override {
+        if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c);
+        if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d);
+    }
+
+    Status compress(const Slice& input, Slice* output) const override {
+        std::vector<Slice> inputs {input};
+        return compress(inputs, output);
+    }
+
     Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
-        LZ4F_compressionContext_t ctx = nullptr;
-        auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
-        if (lres != 0) {
-            return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F compress, res=$0",
-                                                               LZ4F_getErrorName(lres)));
-        }
-        auto st = _compress(ctx, inputs, output);
-        LZ4F_freeCompressionContext(ctx);
-        return st;
+        if (!ctx_c_inited)
+            return Status::InvalidArgument("LZ4F_createCompressionContext not sucess");
+
+        return _compress(ctx_c, inputs, output);
     }
 
     Status decompress(const Slice& input, Slice* output) const override {
-        LZ4F_decompressionContext_t ctx;
-        auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
-        if (LZ4F_isError(lres)) {
-            return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F decompress, res=$0",
-                                                               LZ4F_getErrorName(lres)));
-        }
-        auto st = _decompress(ctx, input, output);
-        LZ4F_freeDecompressionContext(ctx);
-        return st;
+        if (!ctx_d_inited)
+            return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess");
+
+        return _decompress(ctx_d, input, output);
     }
 
     size_t max_compressed_len(size_t len) const override {
@@ -167,6 +167,8 @@ private:
     }
 
     Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const {
+        // reset decompression context to avoid ERROR_maxBlockSize_invalid
+        LZ4F_resetDecompressionContext(ctx);
         size_t input_size = input.size;
         auto lres =
                 LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr);
@@ -187,6 +189,10 @@ private:
 
 private:
     static LZ4F_preferences_t _s_preferences;
+    LZ4F_compressionContext_t ctx_c;
+    bool ctx_c_inited = false;
+    LZ4F_decompressionContext_t ctx_d;
+    bool ctx_d_inited = false;
 };
 
 LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@@ -370,27 +376,38 @@ public:
 };
 
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
-                                   const BlockCompressionCodec** codec) {
+                                   std::unique_ptr<BlockCompressionCodec>& codec) {
+    BlockCompressionCodec* ptr = nullptr;
     switch (type) {
     case segment_v2::CompressionTypePB::NO_COMPRESSION:
-        *codec = nullptr;
-        break;
+        codec.reset(nullptr);
+        return Status::OK();
     case segment_v2::CompressionTypePB::SNAPPY:
-        *codec = SnappyBlockCompression::instance();
+        ptr = new SnappyBlockCompression();
         break;
     case segment_v2::CompressionTypePB::LZ4:
-        *codec = Lz4BlockCompression::instance();
+        ptr = new Lz4BlockCompression();
         break;
     case segment_v2::CompressionTypePB::LZ4F:
-        *codec = Lz4fBlockCompression::instance();
+        ptr = new Lz4fBlockCompression();
         break;
     case segment_v2::CompressionTypePB::ZLIB:
-        *codec = ZlibBlockCompression::instance();
+        ptr = new ZlibBlockCompression();
         break;
     default:
         return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
     }
-    return Status::OK();
+
+    if (!ptr) return Status::NotFound("Failed to create compression codec");
+
+    Status st = ptr->init();
+    if (st.ok()) {
+        codec.reset(ptr);
+    } else {
+        delete ptr;
+    }
+
+    return st;
 }
 
 } // namespace doris
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index ff25113793..7ad3f9ecb7 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -34,6 +34,8 @@ class BlockCompressionCodec {
 public:
     virtual ~BlockCompressionCodec() {}
 
+    virtual Status init() { return Status::OK(); }
+
     // This function will compress input data into output.
     // output should be preallocated, and its capacity must be large enough
     // for compressed input, which can be get through max_compressed_len function.
@@ -61,6 +63,6 @@ public:
 //
 // Return not OK, if error happens.
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
-                                   const BlockCompressionCodec** codec);
+                                   std::unique_ptr<BlockCompressionCodec>& codec);
 
 } // namespace doris
diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp
index cec2ac48da..0bf62e6d9d 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -42,8 +42,8 @@ static std::string generate_str(size_t len) {
 }
 
 void test_single_slice(segment_v2::CompressionTypePB type) {
-    const BlockCompressionCodec* codec = nullptr;
-    auto st = get_block_compression_codec(type, &codec);
+    std::unique_ptr<BlockCompressionCodec> codec;
+    auto st = get_block_compression_codec(type, codec);
     EXPECT_TRUE(st.ok());
 
     size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
@@ -104,8 +104,8 @@ TEST_F(BlockCompressionTest, single) {
 }
 
 void test_multi_slices(segment_v2::CompressionTypePB type) {
-    const BlockCompressionCodec* codec = nullptr;
-    auto st = get_block_compression_codec(type, &codec);
+    std::unique_ptr<BlockCompressionCodec> codec;
+    auto st = get_block_compression_codec(type, codec);
     EXPECT_TRUE(st.ok());
 
     size_t test_sizes[] = {0, 1, 10, 1000, 1000000};


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org