You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/24 03:28:17 UTC

[incubator-doris] 04/04: [bugfix]fix column reader compress codec unsafe problem (#9741)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 3d5829e3f0058913a265bf710a8f58f8da81ecdd
Author: Kang <kx...@gmail.com>
AuthorDate: Mon May 23 20:25:49 2022 +0800

    [bugfix]fix column reader compress codec unsafe problem (#9741)
    
    by moving codec from shared reader to unshared iterator
---
 be/src/olap/rowset/segment_v2/column_reader.cpp        | 18 +++++++++++++-----
 be/src/olap/rowset/segment_v2/column_reader.h          | 13 +++++++++++--
 .../olap/rowset/segment_v2/indexed_column_reader.cpp   | 18 +++++++++++++-----
 be/src/olap/rowset/segment_v2/indexed_column_reader.h  |  8 ++++++--
 be/src/util/block_compression.h                        |  7 ++++++-
 5 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 63c4f0d36f..929b04cf5e 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -105,7 +105,6 @@ Status ColumnReader::init() {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
 
     for (int i = 0; i < _meta.indexes_size(); i++) {
         auto& index_meta = _meta.indexes(i);
@@ -141,12 +140,13 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {
 }
 
 Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
-                               PageHandle* handle, Slice* page_body, PageFooterPB* footer) {
+                               PageHandle* handle, Slice* page_body, PageFooterPB* footer,
+                               BlockCompressionCodec* codec) {
     iter_opts.sanity_check();
     PageReadOptions opts;
     opts.rblock = iter_opts.rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec.get();
+    opts.codec = codec;
     opts.stats = iter_opts.stats;
     opts.verify_checksum = _opts.verify_checksum;
     opts.use_page_cache = iter_opts.use_page_cache;
@@ -457,6 +457,12 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool
 
 FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) {}
 
+Status FileColumnIterator::init(const ColumnIteratorOptions& opts) {
+    _opts = opts;
+    RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec));
+    return Status::OK();
+}
+
 FileColumnIterator::~FileColumnIterator() {
     _opts.mem_tracker->Release(_opts.mem_tracker->consumption());
 }
@@ -646,7 +652,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
     Slice page_body;
     PageFooterPB footer;
     _opts.type = DATA_PAGE;
-    RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer));
+    RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer,
+                                       _compress_codec.get()));
     // parse data page
     RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(),
                                        _reader->encoding_info(), iter.page(), iter.page_index(),
@@ -666,7 +673,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
                 PageFooterPB dict_footer;
                 _opts.type = INDEX_PAGE;
                 RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(),
-                                                   &_dict_page_handle, &dict_data, &dict_footer));
+                                                   &_dict_page_handle, &dict_data, &dict_footer,
+                                                   _compress_codec.get()));
                 // ignore dict_footer.dict_page_footer().encoding() due to only
                 // PLAIN_ENCODING is supported for dict page right now
                 _dict_decoder = std::make_unique<BinaryPlainPageDecoder>(dict_data);
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index 76c41ae991..b7d33cb4b6 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -105,7 +105,8 @@ public:
 
     // read a page from file into a page handle
     Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
-                     PageHandle* handle, Slice* page_body, PageFooterPB* footer);
+                     PageHandle* handle, Slice* page_body, PageFooterPB* footer,
+                     BlockCompressionCodec* codec);
 
     bool is_nullable() const { return _meta.is_nullable(); }
 
@@ -131,6 +132,10 @@ public:
 
     PagePointer get_dict_page_pointer() const { return _meta.dict_page(); }
 
+    bool is_empty() const { return _num_rows == 0; }
+
+    CompressionTypePB get_compression() const { return _meta.compression(); }
+
 private:
     ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
                  FilePathDesc path_desc);
@@ -174,7 +179,6 @@ private:
     const TypeInfo* _type_info = nullptr; // initialized in init(), may changed by subclasses.
     const EncodingInfo* _encoding_info =
             nullptr; // initialized in init(), used for create PageDecoder
-    std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()
 
     // meta for various column indexes (null if the index is absent)
     const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
@@ -253,6 +257,8 @@ public:
     explicit FileColumnIterator(ColumnReader* reader);
     ~FileColumnIterator() override;
 
+    Status init(const ColumnIteratorOptions& opts) override;
+
     Status seek_to_first() override;
 
     Status seek_to_ordinal(ordinal_t ord) override;
@@ -285,6 +291,9 @@ private:
 private:
     ColumnReader* _reader;
 
+    // iterator owned compress codec, should NOT be shared by threads, initialized in init()
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
+
     // 1. The _page represents current page.
     // 2. We define an operation is one seek and following read,
     //    If new seek is issued, the _page will be reset.
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 3ee72c8a11..34230775d3 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -37,7 +37,6 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
                 strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
     }
     RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
-    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
     _value_key_coder = get_key_coder(_type_info->type());
 
     std::unique_ptr<fs::ReadableBlock> rblock;
@@ -72,17 +71,21 @@ Status IndexedColumnReader::load_index_page(fs::ReadableBlock* rblock, const Pag
                                             PageHandle* handle, IndexPageReader* reader) {
     Slice body;
     PageFooterPB footer;
-    RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE));
+    std::unique_ptr<BlockCompressionCodec> local_compress_codec;
+    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), local_compress_codec));
+    RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
+                              local_compress_codec.get()));
     RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
     return Status::OK();
 }
 
 Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePointer& pp,
-                                      PageHandle* handle, Slice* body, PageFooterPB* footer, PageTypePB type) const {
+                                      PageHandle* handle, Slice* body, PageFooterPB* footer,
+                                      PageTypePB type, BlockCompressionCodec* codec) const {
     PageReadOptions opts;
     opts.rblock = rblock;
     opts.page_pointer = pp;
-    opts.codec = _compress_codec.get();
+    opts.codec = codec;
     OlapReaderStatistics tmp_stats;
     opts.stats = &tmp_stats;
     opts.use_page_cache = _use_page_cache;
@@ -95,10 +98,15 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
 ///////////////////////////////////////////////////////////////////////////////
 
 Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
+    // there is not init() for IndexedColumnIterator, so do it here
+    if (!_compress_codec.get())
+        RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec));
+
     PageHandle handle;
     Slice body;
     PageFooterPB footer;
-    RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE));
+    RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE,
+                                       _compress_codec.get()));
     // parse data page
     // note that page_index is not used in IndexedColumnIterator, so we pass 0
     return ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
index 3c464f32f0..1317d71e8c 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
@@ -52,7 +52,8 @@ public:
 
     // read a page specified by `pp' from `file' into `handle'
     Status read_page(fs::ReadableBlock* rblock, const PagePointer& pp, PageHandle* handle,
-                     Slice* body, PageFooterPB* footer, PageTypePB type) const;
+                     Slice* body, PageFooterPB* footer, PageTypePB type,
+                     BlockCompressionCodec* codec) const;
 
     int64_t num_values() const { return _num_values; }
     const EncodingInfo* encoding_info() const { return _encoding_info; }
@@ -60,6 +61,8 @@ public:
     bool support_ordinal_seek() const { return _meta.has_ordinal_index_meta(); }
     bool support_value_seek() const { return _meta.has_value_index_meta(); }
 
+    CompressionTypePB get_compression() const { return _meta.compression(); }
+
 private:
     Status load_index_page(fs::ReadableBlock* rblock, const PagePointerPB& pp, PageHandle* handle,
                            IndexPageReader* reader);
@@ -84,7 +87,6 @@ private:
 
     const TypeInfo* _type_info = nullptr;
     const EncodingInfo* _encoding_info = nullptr;
-    std::unique_ptr<BlockCompressionCodec> _compress_codec;
     const KeyCoder* _value_key_coder = nullptr;
 };
 
@@ -145,6 +147,8 @@ private:
     ordinal_t _current_ordinal = 0;
     // open file handle
     std::unique_ptr<fs::ReadableBlock> _rblock;
+    // iterator owned compress codec, should NOT be shared by threads, initialized before used
+    std::unique_ptr<BlockCompressionCodec> _compress_codec;
 };
 
 } // namespace segment_v2
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index 7ad3f9ecb7..ddda23a3ba 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -30,6 +30,9 @@ namespace doris {
 // This class only used to compress a block data, which means all data
 // should given when call compress or decompress. This class don't handle
 // stream compression.
+//
+// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads
+//
 class BlockCompressionCodec {
 public:
     virtual ~BlockCompressionCodec() {}
@@ -59,7 +62,9 @@ public:
 // Get a BlockCompressionCodec through type.
 // Return Status::OK if a valid codec is found. If codec is null, it means it is
 // NO_COMPRESSION. If codec is not null, user can use it to compress/decompress
-// data. And client doesn't have to release the codec.
+// data.
+//
+// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads
 //
 // Return not OK, if error happens.
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org