You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/15 13:18:39 UTC
[incubator-doris] branch master updated: [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e0c790094c [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
e0c790094c is described below
commit e0c790094c87325bd64649d8705e127730e9a728
Author: Kang <kx...@gmail.com>
AuthorDate: Sun May 15 21:18:32 2022 +0800
[enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566)
---
be/src/olap/rowset/segment_v2/column_reader.cpp | 4 +-
be/src/olap/rowset/segment_v2/column_reader.h | 2 +-
be/src/olap/rowset/segment_v2/column_writer.cpp | 8 +-
be/src/olap/rowset/segment_v2/column_writer.h | 2 +-
.../rowset/segment_v2/indexed_column_reader.cpp | 4 +-
.../olap/rowset/segment_v2/indexed_column_reader.h | 2 +-
.../rowset/segment_v2/indexed_column_writer.cpp | 6 +-
.../olap/rowset/segment_v2/indexed_column_writer.h | 2 +-
be/src/util/block_compression.cpp | 93 +++++++++++++---------
be/src/util/block_compression.h | 4 +-
be/test/util/block_compression_test.cpp | 8 +-
11 files changed, 77 insertions(+), 58 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index cc88552f9c..1b57d35269 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -104,7 +104,7 @@ Status ColumnReader::init() {
strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info));
- RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
for (int i = 0; i < _meta.indexes_size(); i++) {
auto& index_meta = _meta.indexes(i);
@@ -149,7 +149,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
PageReadOptions opts;
opts.rblock = iter_opts.rblock;
opts.page_pointer = pp;
- opts.codec = _compress_codec;
+ opts.codec = _compress_codec.get();
opts.stats = iter_opts.stats;
opts.verify_checksum = _opts.verify_checksum;
opts.use_page_cache = iter_opts.use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index 305ba5f8dc..0e3f11c859 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -175,7 +175,7 @@ private:
TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses.
const EncodingInfo* _encoding_info =
nullptr; // initialized in init(), used for create PageDecoder
- const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()
+ std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()
// meta for various column indexes (null if the index is absent)
const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 5cb5140102..9a54b210c8 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -264,7 +264,7 @@ ScalarColumnWriter::~ScalarColumnWriter() {
}
Status ScalarColumnWriter::init() {
- RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec));
PageBuilder* page_builder = nullptr;
@@ -420,7 +420,7 @@ Status ScalarColumnWriter::write_data() {
footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING);
PagePointer dict_pp;
- RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+ RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
_opts.compression_min_space_saving, _wblock,
{dict_body.slice()}, footer, &dict_pp));
dict_pp.to_proto(_opts.meta->mutable_dict_page());
@@ -508,8 +508,8 @@ Status ScalarColumnWriter::finish_current_page() {
}
// trying to compress page body
OwnedSlice compressed_body;
- RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
- body, &compressed_body));
+ RETURN_IF_ERROR(PageIO::compress_page_body(
+ _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body));
if (compressed_body.slice().empty()) {
// page body is uncompressed
page->data.emplace_back(std::move(encoded_values));
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h
index 0cacbf8547..2f50ebf075 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -249,7 +249,7 @@ private:
PageHead _pages;
ordinal_t _first_rowid = 0;
- const BlockCompressionCodec* _compress_codec = nullptr;
+ std::unique_ptr<BlockCompressionCodec> _compress_codec;
std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 4ebe0d3af3..955c3b96e3 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
- RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
_value_key_coder = get_key_coder(_type_info->type());
std::unique_ptr<fs::ReadableBlock> rblock;
@@ -83,7 +83,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
PageReadOptions opts;
opts.rblock = rblock;
opts.page_pointer = pp;
- opts.codec = _compress_codec;
+ opts.codec = _compress_codec.get();
OlapReaderStatistics tmp_stats;
opts.stats = &tmp_stats;
opts.use_page_cache = _use_page_cache;
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
index 439c790e60..6663ac1077 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
@@ -84,7 +84,7 @@ private:
const TypeInfo* _type_info = nullptr;
const EncodingInfo* _encoding_info = nullptr;
- const BlockCompressionCodec* _compress_codec = nullptr;
+ std::unique_ptr<BlockCompressionCodec> _compress_codec;
const KeyCoder* _value_key_coder = nullptr;
};
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
index d195b9f2c4..4c7259c90c 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
@@ -72,7 +72,7 @@ Status IndexedColumnWriter::init() {
}
if (_options.compression != NO_COMPRESSION) {
- RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec));
+ RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec));
}
return Status::OK();
}
@@ -110,7 +110,7 @@ Status IndexedColumnWriter::_finish_current_data_page() {
footer.mutable_data_page_footer()->set_num_values(num_values_in_page);
footer.mutable_data_page_footer()->set_nullmap_size(0);
- RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
+ RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
_options.compression_min_space_saving, _wblock,
{page_body.slice()}, footer, &_last_data_page));
_num_data_pages++;
@@ -159,7 +159,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM
PagePointer pp;
RETURN_IF_ERROR(PageIO::compress_and_write_page(
- _compress_codec, _options.compression_min_space_saving, _wblock,
+ _compress_codec.get(), _options.compression_min_space_saving, _wblock,
{page_body.slice()}, page_footer, &pp));
meta->set_is_root_data_page(false);
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
index fc222388c6..285ba890b2 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h
@@ -108,7 +108,7 @@ private:
std::unique_ptr<IndexPageBuilder> _value_index_builder;
// encoder for value index's key
const KeyCoder* _value_key_coder;
- const BlockCompressionCodec* _compress_codec;
+ std::unique_ptr<BlockCompressionCodec> _compress_codec;
DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter);
};
diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp
index 1b0f8143e2..a1ee74f047 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -88,46 +88,46 @@ public:
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
public:
- static const Lz4fBlockCompression* instance() {
- static Lz4fBlockCompression s_instance;
- return &s_instance;
- }
-
- ~Lz4fBlockCompression() override {}
+ Status init() override {
+ auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION);
+ if (LZ4F_isError(ret1)) {
+ return Status::InvalidArgument(strings::Substitute(
+ "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1)));
+ }
+ ctx_c_inited = true;
- Status compress(const Slice& input, Slice* output) const override {
- auto compressed_len = LZ4F_compressFrame(output->data, output->size, input.data, input.size,
- &_s_preferences);
- if (LZ4F_isError(compressed_len)) {
+ auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION);
+ if (LZ4F_isError(ret2)) {
return Status::InvalidArgument(strings::Substitute(
- "Fail to do LZ4F compress frame, msg=$0", LZ4F_getErrorName(compressed_len)));
+ "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2)));
}
- output->size = compressed_len;
+ ctx_d_inited = true;
+
return Status::OK();
}
+ ~Lz4fBlockCompression() override {
+ if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c);
+ if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d);
+ }
+
+ Status compress(const Slice& input, Slice* output) const override {
+ std::vector<Slice> inputs {input};
+ return compress(inputs, output);
+ }
+
Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
- LZ4F_compressionContext_t ctx = nullptr;
- auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
- if (lres != 0) {
- return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F compress, res=$0",
- LZ4F_getErrorName(lres)));
- }
- auto st = _compress(ctx, inputs, output);
- LZ4F_freeCompressionContext(ctx);
- return st;
+ if (!ctx_c_inited)
+ return Status::InvalidArgument("LZ4F_createCompressionContext not sucess");
+
+ return _compress(ctx_c, inputs, output);
}
Status decompress(const Slice& input, Slice* output) const override {
- LZ4F_decompressionContext_t ctx;
- auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
- if (LZ4F_isError(lres)) {
- return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F decompress, res=$0",
- LZ4F_getErrorName(lres)));
- }
- auto st = _decompress(ctx, input, output);
- LZ4F_freeDecompressionContext(ctx);
- return st;
+ if (!ctx_d_inited)
+ return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess");
+
+ return _decompress(ctx_d, input, output);
}
size_t max_compressed_len(size_t len) const override {
@@ -167,6 +167,8 @@ private:
}
Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const {
+ // reset decompression context to avoid ERROR_maxBlockSize_invalid
+ LZ4F_resetDecompressionContext(ctx);
size_t input_size = input.size;
auto lres =
LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr);
@@ -187,6 +189,10 @@ private:
private:
static LZ4F_preferences_t _s_preferences;
+ LZ4F_compressionContext_t ctx_c;
+ bool ctx_c_inited = false;
+ LZ4F_decompressionContext_t ctx_d;
+ bool ctx_d_inited = false;
};
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@@ -370,27 +376,38 @@ public:
};
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
- const BlockCompressionCodec** codec) {
+ std::unique_ptr<BlockCompressionCodec>& codec) {
+ BlockCompressionCodec* ptr = nullptr;
switch (type) {
case segment_v2::CompressionTypePB::NO_COMPRESSION:
- *codec = nullptr;
- break;
+ codec.reset(nullptr);
+ return Status::OK();
case segment_v2::CompressionTypePB::SNAPPY:
- *codec = SnappyBlockCompression::instance();
+ ptr = new SnappyBlockCompression();
break;
case segment_v2::CompressionTypePB::LZ4:
- *codec = Lz4BlockCompression::instance();
+ ptr = new Lz4BlockCompression();
break;
case segment_v2::CompressionTypePB::LZ4F:
- *codec = Lz4fBlockCompression::instance();
+ ptr = new Lz4fBlockCompression();
break;
case segment_v2::CompressionTypePB::ZLIB:
- *codec = ZlibBlockCompression::instance();
+ ptr = new ZlibBlockCompression();
break;
default:
return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
}
- return Status::OK();
+
+ if (!ptr) return Status::NotFound("Failed to create compression codec");
+
+ Status st = ptr->init();
+ if (st.ok()) {
+ codec.reset(ptr);
+ } else {
+ delete ptr;
+ }
+
+ return st;
}
} // namespace doris
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index ff25113793..7ad3f9ecb7 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -34,6 +34,8 @@ class BlockCompressionCodec {
public:
virtual ~BlockCompressionCodec() {}
+ virtual Status init() { return Status::OK(); }
+
// This function will compress input data into output.
// output should be preallocated, and its capacity must be large enough
// for compressed input, which can be get through max_compressed_len function.
@@ -61,6 +63,6 @@ public:
//
// Return not OK, if error happens.
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
- const BlockCompressionCodec** codec);
+ std::unique_ptr<BlockCompressionCodec>& codec);
} // namespace doris
diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp
index cec2ac48da..0bf62e6d9d 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -42,8 +42,8 @@ static std::string generate_str(size_t len) {
}
void test_single_slice(segment_v2::CompressionTypePB type) {
- const BlockCompressionCodec* codec = nullptr;
- auto st = get_block_compression_codec(type, &codec);
+ std::unique_ptr<BlockCompressionCodec> codec;
+ auto st = get_block_compression_codec(type, codec);
EXPECT_TRUE(st.ok());
size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
@@ -104,8 +104,8 @@ TEST_F(BlockCompressionTest, single) {
}
void test_multi_slices(segment_v2::CompressionTypePB type) {
- const BlockCompressionCodec* codec = nullptr;
- auto st = get_block_compression_codec(type, &codec);
+ std::unique_ptr<BlockCompressionCodec> codec;
+ auto st = get_block_compression_codec(type, codec);
EXPECT_TRUE(st.ok());
size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org