You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/16 16:39:26 UTC
[incubator-doris] 16/17: [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 53fb90f04f2dd997f4b6235cc57011eaf102b506
Author: Kang <kx...@gmail.com>
AuthorDate: Sun May 15 21:18:32 2022 +0800
[enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
---
be/src/olap/rowset/segment_v2/column_reader.cpp | 4 +-
be/src/olap/rowset/segment_v2/column_reader.h | 2 +-
be/src/olap/rowset/segment_v2/column_writer.cpp | 8 +-
be/src/olap/rowset/segment_v2/column_writer.h | 2 +-
.../rowset/segment_v2/indexed_column_reader.cpp | 4 +-
.../olap/rowset/segment_v2/indexed_column_reader.h | 2 +-
.../rowset/segment_v2/indexed_column_writer.cpp | 6 +-
.../olap/rowset/segment_v2/indexed_column_writer.h | 2 +-
be/src/runtime/mem_pool.cpp | 11 +++
be/src/runtime/mem_pool.h | 5 ++
be/src/util/block_compression.cpp | 93 +++++++++++++---------
be/src/util/block_compression.h | 4 +-
be/test/util/block_compression_test.cpp | 12 +--
13 files changed, 95 insertions(+), 60 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index a67793463f..982de671d7 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -105,7 +105,7 @@ Status ColumnReader::init() {
strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
- RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
for (int i = 0; i < _meta.indexes_size(); i++) {
auto& index_meta = _meta.indexes(i);
@@ -146,7 +146,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
PageReadOptions opts;
opts.rblock = iter_opts.rblock;
opts.page_pointer = pp;
- opts.codec = _compress_codec;
+ opts.codec = _compress_codec.get();
opts.stats = iter_opts.stats;
opts.verify_checksum = _opts.verify_checksum;
opts.use_page_cache = iter_opts.use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index a8b0263aac..99206bd53a 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -174,7 +174,7 @@ private:
const TypeInfo* _type_info = nullptr; // initialized in init(), may changed by subclasses.
const EncodingInfo* _encoding_info =
nullptr; // initialized in init(), used for create PageDecoder
- const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()
+ std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()
// meta for various column indexes (null if the index is absent)
const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 4c5623250f..bcf2b2a338 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -221,7 +221,7 @@ ScalarColumnWriter::~ScalarColumnWriter() {
}
Status ScalarColumnWriter::init() {
- RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec));
PageBuilder* page_builder = nullptr;
@@ -377,7 +377,7 @@ Status ScalarColumnWriter::write_data() {
footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING);
PagePointer dict_pp;
- RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+ RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
_opts.compression_min_space_saving, _wblock,
{dict_body.slice()}, footer, &dict_pp));
dict_pp.to_proto(_opts.meta->mutable_dict_page());
@@ -465,8 +465,8 @@ Status ScalarColumnWriter::finish_current_page() {
}
// trying to compress page body
OwnedSlice compressed_body;
- RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
- body, &compressed_body));
+ RETURN_IF_ERROR(PageIO::compress_page_body(
+ _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body));
if (compressed_body.slice().empty()) {
// page body is uncompressed
page->data.emplace_back(std::move(encoded_values));
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h
index b98f4883ca..6235b313b2 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -244,7 +244,7 @@ private:
PageHead _pages;
ordinal_t _first_rowid = 0;
- const BlockCompressionCodec* _compress_codec = nullptr;
+ std::unique_ptr<BlockCompressionCodec> _compress_codec;
std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 67a873fae1..3ee72c8a11 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
- RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
_value_key_coder = get_key_coder(_type_info->type());
std::unique_ptr<fs::ReadableBlock> rblock;
@@ -82,7 +82,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
PageReadOptions opts;
opts.rblock = rblock;
opts.page_pointer = pp;
- opts.codec = _compress_codec;
+ opts.codec = _compress_codec.get();
OlapReaderStatistics tmp_stats;
opts.stats = &tmp_stats;
opts.use_page_cache = _use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
index a1030586ab..3c464f32f0 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
@@ -84,7 +84,7 @@ private:
const TypeInfo* _type_info = nullptr;
const EncodingInfo* _encoding_info = nullptr;
- const BlockCompressionCodec* _compress_codec = nullptr;
+ std::unique_ptr<BlockCompressionCodec> _compress_codec;
const KeyCoder* _value_key_coder = nullptr;
};
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
index 088de6940e..a69b641d32 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
@@ -73,7 +73,7 @@ Status IndexedColumnWriter::init() {
}
if (_options.compression != NO_COMPRESSION) {
- RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec));
}
return Status::OK();
}
@@ -111,7 +111,7 @@ Status IndexedColumnWriter::_finish_current_data_page() {
footer.mutable_data_page_footer()->set_num_values(num_values_in_page);
footer.mutable_data_page_footer()->set_nullmap_size(0);
- RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+ RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
_options.compression_min_space_saving, _wblock,
{page_body.slice()}, footer, &_last_data_page));
_num_data_pages++;
@@ -160,7 +160,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM
PagePointer pp;
RETURN_IF_ERROR(PageIO::compress_and_write_page(
- _compress_codec, _options.compression_min_space_saving, _wblock,
+ _compress_codec.get(), _options.compression_min_space_saving, _wblock,
{page_body.slice()}, page_footer, &pp));
meta->set_is_root_data_page(false);
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
index bcb27f4343..b33c5d4a82 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
@@ -110,7 +110,7 @@ private:
std::unique_ptr<IndexPageBuilder> _value_index_builder;
// encoder for value index's key
const KeyCoder* _value_key_coder;
- const BlockCompressionCodec* _compress_codec;
+ std::unique_ptr<BlockCompressionCodec> _compress_codec;
DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter);
};
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 212b88e7de..64c6458351 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -41,6 +41,17 @@ MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_by
DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size);
}
+
+MemPool::MemPool(const std::string& label)
+ : current_chunk_idx_(-1),
+ next_chunk_size_(INITIAL_CHUNK_SIZE),
+ total_allocated_bytes_(0),
+ total_reserved_bytes_(0),
+ peak_allocated_bytes_(0) {
+ mem_tracker_own_ = MemTracker::CreateTracker(-1, label + ":MemPool");
+ mem_tracker_ = mem_tracker_own_.get();
+}
+
MemPool::~MemPool() {
int64_t total_bytes_released = 0;
for (auto& chunk : chunks_) {
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 397d2cde41..63a73db764 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -99,6 +99,8 @@ public:
DCHECK(mem_tracker != nullptr);
}
+ MemPool(const std::string& label);
+
/// Frees all chunks of memory and subtracts the total allocated bytes
/// from the registered limits.
~MemPool();
@@ -302,6 +304,9 @@ private:
/// The current and peak memory footprint of this pool. This is different from
/// total allocated_bytes_ since it includes bytes in chunks that are not used.
MemTracker* mem_tracker_;
+
+ // TODO(zxy) temp variable, In the future, mem trackers should all use raw pointers.
+ std::shared_ptr<MemTracker> mem_tracker_own_;
};
// Stamp out templated implementations here so they're included in IR module
diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp
index 1b0f8143e2..a1ee74f047 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -88,46 +88,46 @@ public:
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
public:
- static const Lz4fBlockCompression* instance() {
- static Lz4fBlockCompression s_instance;
- return &s_instance;
- }
-
- ~Lz4fBlockCompression() override {}
+ Status init() override {
+ auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION);
+ if (LZ4F_isError(ret1)) {
+ return Status::InvalidArgument(strings::Substitute(
+ "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1)));
+ }
+ ctx_c_inited = true;
- Status compress(const Slice& input, Slice* output) const override {
- auto compressed_len = LZ4F_compressFrame(output->data, output->size, input.data, input.size,
- &_s_preferences);
- if (LZ4F_isError(compressed_len)) {
+ auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION);
+ if (LZ4F_isError(ret2)) {
return Status::InvalidArgument(strings::Substitute(
- "Fail to do LZ4F compress frame, msg=$0", LZ4F_getErrorName(compressed_len)));
+ "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2)));
}
- output->size = compressed_len;
+ ctx_d_inited = true;
+
return Status::OK();
}
+ ~Lz4fBlockCompression() override {
+ if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c);
+ if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d);
+ }
+
+ Status compress(const Slice& input, Slice* output) const override {
+ std::vector<Slice> inputs {input};
+ return compress(inputs, output);
+ }
+
Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
- LZ4F_compressionContext_t ctx = nullptr;
- auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
- if (lres != 0) {
- return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F compress, res=$0",
- LZ4F_getErrorName(lres)));
- }
- auto st = _compress(ctx, inputs, output);
- LZ4F_freeCompressionContext(ctx);
- return st;
+ if (!ctx_c_inited)
+ return Status::InvalidArgument("LZ4F_createCompressionContext not sucess");
+
+ return _compress(ctx_c, inputs, output);
}
Status decompress(const Slice& input, Slice* output) const override {
- LZ4F_decompressionContext_t ctx;
- auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
- if (LZ4F_isError(lres)) {
- return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F decompress, res=$0",
- LZ4F_getErrorName(lres)));
- }
- auto st = _decompress(ctx, input, output);
- LZ4F_freeDecompressionContext(ctx);
- return st;
+ if (!ctx_d_inited)
+ return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess");
+
+ return _decompress(ctx_d, input, output);
}
size_t max_compressed_len(size_t len) const override {
@@ -167,6 +167,8 @@ private:
}
Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const {
+ // reset decompression context to avoid ERROR_maxBlockSize_invalid
+ LZ4F_resetDecompressionContext(ctx);
size_t input_size = input.size;
auto lres =
LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr);
@@ -187,6 +189,10 @@ private:
private:
static LZ4F_preferences_t _s_preferences;
+ LZ4F_compressionContext_t ctx_c;
+ bool ctx_c_inited = false;
+ LZ4F_decompressionContext_t ctx_d;
+ bool ctx_d_inited = false;
};
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@@ -370,27 +376,38 @@ public:
};
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
- const BlockCompressionCodec** codec) {
+ std::unique_ptr<BlockCompressionCodec>& codec) {
+ BlockCompressionCodec* ptr = nullptr;
switch (type) {
case segment_v2::CompressionTypePB::NO_COMPRESSION:
- *codec = nullptr;
- break;
+ codec.reset(nullptr);
+ return Status::OK();
case segment_v2::CompressionTypePB::SNAPPY:
- *codec = SnappyBlockCompression::instance();
+ ptr = new SnappyBlockCompression();
break;
case segment_v2::CompressionTypePB::LZ4:
- *codec = Lz4BlockCompression::instance();
+ ptr = new Lz4BlockCompression();
break;
case segment_v2::CompressionTypePB::LZ4F:
- *codec = Lz4fBlockCompression::instance();
+ ptr = new Lz4fBlockCompression();
break;
case segment_v2::CompressionTypePB::ZLIB:
- *codec = ZlibBlockCompression::instance();
+ ptr = new ZlibBlockCompression();
break;
default:
return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
}
- return Status::OK();
+
+ if (!ptr) return Status::NotFound("Failed to create compression codec");
+
+ Status st = ptr->init();
+ if (st.ok()) {
+ codec.reset(ptr);
+ } else {
+ delete ptr;
+ }
+
+ return st;
}
} // namespace doris
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index ff25113793..7ad3f9ecb7 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -34,6 +34,8 @@ class BlockCompressionCodec {
public:
virtual ~BlockCompressionCodec() {}
+ virtual Status init() { return Status::OK(); }
+
// This function will compress input data into output.
// output should be preallocated, and its capacity must be large enough
// for compressed input, which can be get through max_compressed_len function.
@@ -61,6 +63,6 @@ public:
//
// Return not OK, if error happens.
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
- const BlockCompressionCodec** codec);
+ std::unique_ptr<BlockCompressionCodec>& codec);
} // namespace doris
diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp
index 3eabd735b8..a339d54409 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -42,9 +42,9 @@ static std::string generate_str(size_t len) {
}
void test_single_slice(segment_v2::CompressionTypePB type) {
- const BlockCompressionCodec* codec = nullptr;
- auto st = get_block_compression_codec(type, &codec);
- ASSERT_TRUE(st.ok());
+ std::unique_ptr<BlockCompressionCodec> codec;
+ auto st = get_block_compression_codec(type, codec);
+ EXPECT_TRUE(st.ok());
size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
for (auto size : test_sizes) {
@@ -104,9 +104,9 @@ TEST_F(BlockCompressionTest, single) {
}
void test_multi_slices(segment_v2::CompressionTypePB type) {
- const BlockCompressionCodec* codec = nullptr;
- auto st = get_block_compression_codec(type, &codec);
- ASSERT_TRUE(st.ok());
+ std::unique_ptr<BlockCompressionCodec> codec;
+ auto st = get_block_compression_codec(type, codec);
+ EXPECT_TRUE(st.ok());
size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
std::vector<std::string> orig_strs;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org