You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/16 16:39:26 UTC

[incubator-doris] 16/17: [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 53fb90f04f2dd997f4b6235cc57011eaf102b506
Author: Kang <kx...@gmail.com>
AuthorDate: Sun May 15 21:18:32 2022 +0800

    [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
---
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  4 +-
 be/src/olap/rowset/segment_v2/column_reader.h      |  2 +-
 be/src/olap/rowset/segment_v2/column_writer.cpp    |  8 +-
 be/src/olap/rowset/segment_v2/column_writer.h      |  2 +-
 .../rowset/segment_v2/indexed_column_reader.cpp    |  4 +-
 .../olap/rowset/segment_v2/indexed_column_reader.h |  2 +-
 .../rowset/segment_v2/indexed_column_writer.cpp    |  6 +-
 .../olap/rowset/segment_v2/indexed_column_writer.h |  2 +-
 be/src/runtime/mem_pool.cpp                        | 11 +++
 be/src/runtime/mem_pool.h                          |  5 ++
 be/src/util/block_compression.cpp                  | 93 +++++++++++++---------
 be/src/util/block_compression.h                    |  4 +-
 be/test/util/block_compression_test.cpp            | 12 +--
 13 files changed, 95 insertions(+), 60 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index a67793463f..982de671d7 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -105,7 +105,7 @@ Status ColumnReader::init() {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
 
     for (int i = 0; i < _meta.indexes_size(); i++) {
         auto& index_meta = _meta.indexes(i);
@@ -146,7 +146,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
     PageReadOptions opts;
     opts.rblock = iter_opts.rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec;
+    opts.codec = _compress_codec.get();
     opts.stats = iter_opts.stats;
     opts.verify_checksum = _opts.verify_checksum;
     opts.use_page_cache = iter_opts.use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index a8b0263aac..99206bd53a 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -174,7 +174,7 @@ private:
     const TypeInfo* _type_info = nullptr; // initialized in init(), may changed by subclasses.
     const EncodingInfo* _encoding_info =
             nullptr; // initialized in init(), used for create PageDecoder
-    const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()
+    std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()
 
     // meta for various column indexes (null if the index is absent)
     const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 4c5623250f..bcf2b2a338 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -221,7 +221,7 @@ ScalarColumnWriter::~ScalarColumnWriter() {
 }
 
 Status ScalarColumnWriter::init() {
-    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec));
 
     PageBuilder* page_builder = nullptr;
 
@@ -377,7 +377,7 @@ Status ScalarColumnWriter::write_data() {
         footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING);
 
         PagePointer dict_pp;
-        RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+        RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
                                                         _opts.compression_min_space_saving, _wblock,
                                                         {dict_body.slice()}, footer, &dict_pp));
         dict_pp.to_proto(_opts.meta->mutable_dict_page());
@@ -465,8 +465,8 @@ Status ScalarColumnWriter::finish_current_page() {
     }
     // trying to compress page body
     OwnedSlice compressed_body;
-    RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
-                                               body, &compressed_body));
+    RETURN_IF_ERROR(PageIO::compress_page_body(
+            _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body));
     if (compressed_body.slice().empty()) {
         // page body is uncompressed
         page->data.emplace_back(std::move(encoded_values));
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h
index b98f4883ca..6235b313b2 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -244,7 +244,7 @@ private:
     PageHead _pages;
     ordinal_t _first_rowid = 0;
 
-    const BlockCompressionCodec* _compress_codec = nullptr;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
 
     std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
     std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 67a873fae1..3ee72c8a11 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
     _value_key_coder = get_key_coder(_type_info->type());
 
     std::unique_ptr<fs::ReadableBlock> rblock;
@@ -82,7 +82,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
     PageReadOptions opts;
     opts.rblock = rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec;
+    opts.codec = _compress_codec.get();
     OlapReaderStatistics tmp_stats;
     opts.stats = &tmp_stats;
     opts.use_page_cache = _use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
index a1030586ab..3c464f32f0 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
@@ -84,7 +84,7 @@ private:
 
     const TypeInfo* _type_info = nullptr;
     const EncodingInfo* _encoding_info = nullptr;
-    const BlockCompressionCodec* _compress_codec = nullptr;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
     const KeyCoder* _value_key_coder = nullptr;
 };
 
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
index 088de6940e..a69b641d32 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
@@ -73,7 +73,7 @@ Status IndexedColumnWriter::init() {
     }
 
     if (_options.compression != NO_COMPRESSION) {
-        RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec));
+        RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec));
     }
     return Status::OK();
 }
@@ -111,7 +111,7 @@ Status IndexedColumnWriter::_finish_current_data_page() {
     footer.mutable_data_page_footer()->set_num_values(num_values_in_page);
     footer.mutable_data_page_footer()->set_nullmap_size(0);
 
-    RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+    RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
                                                     _options.compression_min_space_saving, _wblock,
                                                     {page_body.slice()}, footer, &_last_data_page));
     _num_data_pages++;
@@ -160,7 +160,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM
 
         PagePointer pp;
         RETURN_IF_ERROR(PageIO::compress_and_write_page(
-                _compress_codec, _options.compression_min_space_saving, _wblock,
+                _compress_codec.get(), _options.compression_min_space_saving, _wblock,
                 {page_body.slice()}, page_footer, &pp));
 
         meta->set_is_root_data_page(false);
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
index bcb27f4343..b33c5d4a82 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
@@ -110,7 +110,7 @@ private:
     std::unique_ptr<IndexPageBuilder> _value_index_builder;
     // encoder for value index's key
     const KeyCoder* _value_key_coder;
-    const BlockCompressionCodec* _compress_codec;
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
 
     DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter);
 };
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 212b88e7de..64c6458351 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -41,6 +41,17 @@ MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_by
     DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size);
 }
 
+
+MemPool::MemPool(const std::string& label)
+        : current_chunk_idx_(-1),
+          next_chunk_size_(INITIAL_CHUNK_SIZE),
+          total_allocated_bytes_(0),
+          total_reserved_bytes_(0),
+          peak_allocated_bytes_(0) {
+    mem_tracker_own_ = MemTracker::CreateTracker(-1, label + ":MemPool");
+    mem_tracker_ = mem_tracker_own_.get();
+}
+
 MemPool::~MemPool() {
     int64_t total_bytes_released = 0;
     for (auto& chunk : chunks_) {
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 397d2cde41..63a73db764 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -99,6 +99,8 @@ public:
         DCHECK(mem_tracker != nullptr);
     }
 
+    MemPool(const std::string& label);
+
     /// Frees all chunks of memory and subtracts the total allocated bytes
     /// from the registered limits.
     ~MemPool();
@@ -302,6 +304,9 @@ private:
     /// The current and peak memory footprint of this pool. This is different from
     /// total allocated_bytes_ since it includes bytes in chunks that are not used.
     MemTracker* mem_tracker_;
+
+    // TODO(zxy) temp variable, In the future, mem trackers should all use raw pointers.
+    std::shared_ptr<MemTracker> mem_tracker_own_;
 };
 
 // Stamp out templated implementations here so they're included in IR module
diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp
index 1b0f8143e2..a1ee74f047 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -88,46 +88,46 @@ public:
 // Used for LZ4 frame format, decompress speed is two times faster than LZ4.
 class Lz4fBlockCompression : public BlockCompressionCodec {
 public:
-    static const Lz4fBlockCompression* instance() {
-        static Lz4fBlockCompression s_instance;
-        return &s_instance;
-    }
-
-    ~Lz4fBlockCompression() override {}
+    Status init() override {
+        auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION);
+        if (LZ4F_isError(ret1)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1)));
+        }
+        ctx_c_inited = true;
 
-    Status compress(const Slice& input, Slice* output) const override {
-        auto compressed_len = LZ4F_compressFrame(output->data, output->size, input.data, input.size,
-                                                 &_s_preferences);
-        if (LZ4F_isError(compressed_len)) {
+        auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION);
+        if (LZ4F_isError(ret2)) {
             return Status::InvalidArgument(strings::Substitute(
-                    "Fail to do LZ4F compress frame, msg=$0", LZ4F_getErrorName(compressed_len)));
+                    "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2)));
         }
-        output->size = compressed_len;
+        ctx_d_inited = true;
+
         return Status::OK();
     }
 
+    ~Lz4fBlockCompression() override {
+        if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c);
+        if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d);
+    }
+
+    Status compress(const Slice& input, Slice* output) const override {
+        std::vector<Slice> inputs {input};
+        return compress(inputs, output);
+    }
+
     Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
-        LZ4F_compressionContext_t ctx = nullptr;
-        auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
-        if (lres != 0) {
-            return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F compress, res=$0",
-                                                               LZ4F_getErrorName(lres)));
-        }
-        auto st = _compress(ctx, inputs, output);
-        LZ4F_freeCompressionContext(ctx);
-        return st;
+        if (!ctx_c_inited)
+            return Status::InvalidArgument("LZ4F_createCompressionContext not sucess");
+
+        return _compress(ctx_c, inputs, output);
     }
 
     Status decompress(const Slice& input, Slice* output) const override {
-        LZ4F_decompressionContext_t ctx;
-        auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
-        if (LZ4F_isError(lres)) {
-            return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F decompress, res=$0",
-                                                               LZ4F_getErrorName(lres)));
-        }
-        auto st = _decompress(ctx, input, output);
-        LZ4F_freeDecompressionContext(ctx);
-        return st;
+        if (!ctx_d_inited)
+            return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess");
+
+        return _decompress(ctx_d, input, output);
     }
 
     size_t max_compressed_len(size_t len) const override {
@@ -167,6 +167,8 @@ private:
     }
 
     Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const {
+        // reset decompression context to avoid ERROR_maxBlockSize_invalid
+        LZ4F_resetDecompressionContext(ctx);
         size_t input_size = input.size;
         auto lres =
                 LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr);
@@ -187,6 +189,10 @@ private:
 
 private:
     static LZ4F_preferences_t _s_preferences;
+    LZ4F_compressionContext_t ctx_c;
+    bool ctx_c_inited = false;
+    LZ4F_decompressionContext_t ctx_d;
+    bool ctx_d_inited = false;
 };
 
 LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@@ -370,27 +376,38 @@ public:
 };
 
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
-                                   const BlockCompressionCodec** codec) {
+                                   std::unique_ptr<BlockCompressionCodec>& codec) {
+    BlockCompressionCodec* ptr = nullptr;
     switch (type) {
     case segment_v2::CompressionTypePB::NO_COMPRESSION:
-        *codec = nullptr;
-        break;
+        codec.reset(nullptr);
+        return Status::OK();
     case segment_v2::CompressionTypePB::SNAPPY:
-        *codec = SnappyBlockCompression::instance();
+        ptr = new SnappyBlockCompression();
         break;
     case segment_v2::CompressionTypePB::LZ4:
-        *codec = Lz4BlockCompression::instance();
+        ptr = new Lz4BlockCompression();
         break;
     case segment_v2::CompressionTypePB::LZ4F:
-        *codec = Lz4fBlockCompression::instance();
+        ptr = new Lz4fBlockCompression();
         break;
     case segment_v2::CompressionTypePB::ZLIB:
-        *codec = ZlibBlockCompression::instance();
+        ptr = new ZlibBlockCompression();
         break;
     default:
         return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
     }
-    return Status::OK();
+
+    if (!ptr) return Status::NotFound("Failed to create compression codec");
+
+    Status st = ptr->init();
+    if (st.ok()) {
+        codec.reset(ptr);
+    } else {
+        delete ptr;
+    }
+
+    return st;
 }
 
 } // namespace doris
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index ff25113793..7ad3f9ecb7 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -34,6 +34,8 @@ class BlockCompressionCodec {
 public:
     virtual ~BlockCompressionCodec() {}
 
+    virtual Status init() { return Status::OK(); }
+
     // This function will compress input data into output.
     // output should be preallocated, and its capacity must be large enough
     // for compressed input, which can be get through max_compressed_len function.
@@ -61,6 +63,6 @@ public:
 //
 // Return not OK, if error happens.
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
-                                   const BlockCompressionCodec** codec);
+                                   std::unique_ptr<BlockCompressionCodec>& codec);
 
 } // namespace doris
diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp
index 3eabd735b8..a339d54409 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -42,9 +42,9 @@ static std::string generate_str(size_t len) {
 }
 
 void test_single_slice(segment_v2::CompressionTypePB type) {
-    const BlockCompressionCodec* codec = nullptr;
-    auto st = get_block_compression_codec(type, &codec);
-    ASSERT_TRUE(st.ok());
+    std::unique_ptr<BlockCompressionCodec> codec;
+    auto st = get_block_compression_codec(type, codec);
+    EXPECT_TRUE(st.ok());
 
     size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
     for (auto size : test_sizes) {
@@ -104,9 +104,9 @@ TEST_F(BlockCompressionTest, single) {
 }
 
 void test_multi_slices(segment_v2::CompressionTypePB type) {
-    const BlockCompressionCodec* codec = nullptr;
-    auto st = get_block_compression_codec(type, &codec);
-    ASSERT_TRUE(st.ok());
+    std::unique_ptr<BlockCompressionCodec> codec;
+    auto st = get_block_compression_codec(type, codec);
+    EXPECT_TRUE(st.ok());
 
     size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
     std::vector<std::string> orig_strs;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org