You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/10 00:41:19 UTC

[doris] branch master updated: [feature-wip](multi-catalog) Optimize the performance of boolean & dictionary decoding (#13212)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dd089259be [feature-wip](multi-catalog) Optimize the performance of boolean & dictionary decoding (#13212)
dd089259be is described below

commit dd089259bebd73f92a16d38e50cde93d3fd51f59
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Mon Oct 10 08:41:11 2022 +0800

    [feature-wip](multi-catalog) Optimize the performance of boolean & dictionary decoding (#13212)
    
    Generate vector for dictionary data.
    Decode boolean values in batch.
---
 be/src/vec/exec/format/parquet/parquet_common.cpp | 44 +++++++++++++++--------
 be/src/vec/exec/format/parquet/parquet_common.h   | 19 +++++-----
 2 files changed, 39 insertions(+), 24 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 83c702f800..91d91355e3 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -134,6 +134,12 @@ Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t leng
     }
     _has_dict = true;
     _dict = std::move(dict);
+    char* dict_item_address = reinterpret_cast<char*>(_dict.get());
+    _dict_items.resize(num_values);
+    for (size_t i = 0; i < num_values; ++i) {
+        _dict_items[i] = dict_item_address;
+        dict_item_address += _type_length;
+    }
     return Status::OK();
 }
 
@@ -262,12 +268,13 @@ Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t leng
                                   size_t num_values) {
     _has_dict = true;
     _dict = std::move(dict);
-    _dict_offsets.resize(num_values + 1);
+    _dict_items.reserve(num_values);
     uint32_t offset_cursor = 0;
+    char* dict_item_address = reinterpret_cast<char*>(_dict.get());
     for (int i = 0; i < num_values; ++i) {
         uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor);
         offset_cursor += 4;
-        _dict_offsets[i] = offset_cursor;
+        _dict_items.emplace_back(dict_item_address + offset_cursor, l);
         offset_cursor += l;
         if (offset_cursor > length) {
             return Status::Corruption("Wrong data length in dictionary");
@@ -276,7 +283,6 @@ Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t leng
     if (offset_cursor != length) {
         return Status::Corruption("Wrong dictionary data for byte array type");
     }
-    _dict_offsets[num_values] = offset_cursor + 4;
     return Status::OK();
 }
 
@@ -326,10 +332,7 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP
         string_values.reserve(num_values);
         for (int i = 0; i < num_values; ++i) {
             if (_has_dict) {
-                uint32_t idx = _indexes[i];
-                uint32_t idx_cursor = _dict_offsets[idx];
-                char* buff_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
-                string_values.emplace_back(buff_start, _dict_offsets[idx + 1] - idx_cursor - 4);
+                string_values.emplace_back(_dict_items[_indexes[i]]);
             } else {
                 if (UNLIKELY(_offset + 4 > _data->size)) {
                     return Status::IOError("Can't read byte array length from plain decoder");
@@ -388,14 +391,27 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
 Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
                                        size_t num_values) {
     auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
-    auto origin_size = column_data.size();
-    column_data.resize(origin_size + num_values);
-    bool value;
-    for (int i = 0; i < num_values; ++i) {
-        if (UNLIKELY(!_decode_value(&value))) {
-            return Status::IOError("Can't read enough booleans in plain decoder");
+    int has_read = 0;
+    while (has_read < num_values) {
+        int loop_read =
+                std::min((int)num_values - has_read, num_unpacked_values_ - unpacked_value_idx_);
+        if (loop_read > 0) {
+            column_data.insert(unpacked_values_ + unpacked_value_idx_,
+                               unpacked_values_ + unpacked_value_idx_ + loop_read);
+            unpacked_value_idx_ += loop_read;
+            has_read += loop_read;
         }
-        column_data[origin_size + i] = (UInt8)value;
+        if (unpacked_value_idx_ == num_unpacked_values_) {
+            num_unpacked_values_ =
+                    bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+            unpacked_value_idx_ = 0;
+            if (num_unpacked_values_ == 0) {
+                break;
+            }
+        }
+    }
+    if (UNLIKELY(has_read < num_values)) {
+        return Status::IOError("Can't read enough booleans in plain decoder");
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index a5e67fad2e..d17f03ac0e 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -205,9 +205,8 @@ protected:
     Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
                                      size_t num_values);
 
-#define _FIXED_GET_DATA_OFFSET(index)                                                 \
-    _has_dict ? reinterpret_cast<char*>(_dict.get() + _indexes[index] * _type_length) \
-              : _data->data + _offset
+#define _FIXED_GET_DATA_OFFSET(index) \
+    _has_dict ? _dict_items[_indexes[index]] : _data->data + _offset
 
 #define _FIXED_SHIFT_DATA_OFFSET() \
     if (!_has_dict) _offset += _type_length
@@ -216,6 +215,7 @@ protected:
     // For dictionary encoding
     bool _has_dict = false;
     std::unique_ptr<uint8_t[]> _dict = nullptr;
+    std::vector<char*> _dict_items;
     std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
     std::vector<uint32_t> _indexes;
 };
@@ -280,6 +280,7 @@ Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, Type
     auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
     auto origin_size = column_data.size();
     column_data.resize(origin_size + num_values);
+    int64_t scale_to_micro = _decode_params->scale_to_nano_factor / 1000;
     for (int i = 0; i < num_values; i++) {
         char* buf_start = _FIXED_GET_DATA_OFFSET(i);
         int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
@@ -287,8 +288,7 @@ Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, Type
         v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz);
         if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
             // nanoseconds will be ignored.
-            v.set_microsecond((date_value % _decode_params->second_mask) *
-                              _decode_params->scale_to_nano_factor / 1000);
+            v.set_microsecond((date_value % _decode_params->second_mask) * scale_to_micro);
         }
         _FIXED_SHIFT_DATA_OFFSET();
     }
@@ -393,7 +393,7 @@ protected:
     // For dictionary encoding
     bool _has_dict = false;
     std::unique_ptr<uint8_t[]> _dict = nullptr;
-    std::vector<uint32_t> _dict_offsets;
+    std::vector<StringRef> _dict_items;
     std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
     std::vector<uint32_t> _indexes;
 };
@@ -411,10 +411,9 @@ Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
         char* buf_start;
         uint32_t length;
         if (_has_dict) {
-            uint32_t idx = _indexes[i];
-            uint32_t idx_cursor = _dict_offsets[idx];
-            buf_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
-            length = _dict_offsets[idx + 1] - idx_cursor - 4;
+            StringRef& slice = _dict_items[_indexes[i]];
+            buf_start = const_cast<char*>(slice.data);
+            length = (uint32_t)slice.size;
         } else {
             if (UNLIKELY(_offset + 4 > _data->size)) {
                 return Status::IOError("Can't read byte array length from plain decoder");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org