You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/10 00:41:19 UTC
[doris] branch master updated: [feature-wip](multi-catalog) Optimize the performance of boolean & dictionary decoding (#13212)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dd089259be [feature-wip](multi-catalog) Optimize the performance of boolean & dictionary decoding (#13212)
dd089259be is described below
commit dd089259bebd73f92a16d38e50cde93d3fd51f59
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Mon Oct 10 08:41:11 2022 +0800
[feature-wip](multi-catalog) Optimize the performance of boolean & dictionary decoding (#13212)
Generate vector for dictionary data.
Decode boolean values in batch.
---
be/src/vec/exec/format/parquet/parquet_common.cpp | 44 +++++++++++++++--------
be/src/vec/exec/format/parquet/parquet_common.h | 19 +++++-----
2 files changed, 39 insertions(+), 24 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 83c702f800..91d91355e3 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -134,6 +134,12 @@ Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t leng
}
_has_dict = true;
_dict = std::move(dict);
+ char* dict_item_address = reinterpret_cast<char*>(_dict.get());
+ _dict_items.resize(num_values);
+ for (size_t i = 0; i < num_values; ++i) {
+ _dict_items[i] = dict_item_address;
+ dict_item_address += _type_length;
+ }
return Status::OK();
}
@@ -262,12 +268,13 @@ Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t leng
size_t num_values) {
_has_dict = true;
_dict = std::move(dict);
- _dict_offsets.resize(num_values + 1);
+ _dict_items.reserve(num_values);
uint32_t offset_cursor = 0;
+ char* dict_item_address = reinterpret_cast<char*>(_dict.get());
for (int i = 0; i < num_values; ++i) {
uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor);
offset_cursor += 4;
- _dict_offsets[i] = offset_cursor;
+ _dict_items.emplace_back(dict_item_address + offset_cursor, l);
offset_cursor += l;
if (offset_cursor > length) {
return Status::Corruption("Wrong data length in dictionary");
@@ -276,7 +283,6 @@ Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t leng
if (offset_cursor != length) {
return Status::Corruption("Wrong dictionary data for byte array type");
}
- _dict_offsets[num_values] = offset_cursor + 4;
return Status::OK();
}
@@ -326,10 +332,7 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP
string_values.reserve(num_values);
for (int i = 0; i < num_values; ++i) {
if (_has_dict) {
- uint32_t idx = _indexes[i];
- uint32_t idx_cursor = _dict_offsets[idx];
- char* buff_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
- string_values.emplace_back(buff_start, _dict_offsets[idx + 1] - idx_cursor - 4);
+ string_values.emplace_back(_dict_items[_indexes[i]]);
} else {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
@@ -388,14 +391,27 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
- auto origin_size = column_data.size();
- column_data.resize(origin_size + num_values);
- bool value;
- for (int i = 0; i < num_values; ++i) {
- if (UNLIKELY(!_decode_value(&value))) {
- return Status::IOError("Can't read enough booleans in plain decoder");
+ int has_read = 0;
+ while (has_read < num_values) {
+ int loop_read =
+ std::min((int)num_values - has_read, num_unpacked_values_ - unpacked_value_idx_);
+ if (loop_read > 0) {
+ column_data.insert(unpacked_values_ + unpacked_value_idx_,
+ unpacked_values_ + unpacked_value_idx_ + loop_read);
+ unpacked_value_idx_ += loop_read;
+ has_read += loop_read;
}
- column_data[origin_size + i] = (UInt8)value;
+ if (unpacked_value_idx_ == num_unpacked_values_) {
+ num_unpacked_values_ =
+ bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+ unpacked_value_idx_ = 0;
+ if (num_unpacked_values_ == 0) {
+ break;
+ }
+ }
+ }
+ if (UNLIKELY(has_read < num_values)) {
+ return Status::IOError("Can't read enough booleans in plain decoder");
}
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index a5e67fad2e..d17f03ac0e 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -205,9 +205,8 @@ protected:
Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values);
-#define _FIXED_GET_DATA_OFFSET(index) \
- _has_dict ? reinterpret_cast<char*>(_dict.get() + _indexes[index] * _type_length) \
- : _data->data + _offset
+#define _FIXED_GET_DATA_OFFSET(index) \
+ _has_dict ? _dict_items[_indexes[index]] : _data->data + _offset
#define _FIXED_SHIFT_DATA_OFFSET() \
if (!_has_dict) _offset += _type_length
@@ -216,6 +215,7 @@ protected:
// For dictionary encoding
bool _has_dict = false;
std::unique_ptr<uint8_t[]> _dict = nullptr;
+ std::vector<char*> _dict_items;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
std::vector<uint32_t> _indexes;
};
@@ -280,6 +280,7 @@ Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, Type
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
auto origin_size = column_data.size();
column_data.resize(origin_size + num_values);
+ int64_t scale_to_micro = _decode_params->scale_to_nano_factor / 1000;
for (int i = 0; i < num_values; i++) {
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
@@ -287,8 +288,7 @@ Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, Type
v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// nanoseconds will be ignored.
- v.set_microsecond((date_value % _decode_params->second_mask) *
- _decode_params->scale_to_nano_factor / 1000);
+ v.set_microsecond((date_value % _decode_params->second_mask) * scale_to_micro);
}
_FIXED_SHIFT_DATA_OFFSET();
}
@@ -393,7 +393,7 @@ protected:
// For dictionary encoding
bool _has_dict = false;
std::unique_ptr<uint8_t[]> _dict = nullptr;
- std::vector<uint32_t> _dict_offsets;
+ std::vector<StringRef> _dict_items;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
std::vector<uint32_t> _indexes;
};
@@ -411,10 +411,9 @@ Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
char* buf_start;
uint32_t length;
if (_has_dict) {
- uint32_t idx = _indexes[i];
- uint32_t idx_cursor = _dict_offsets[idx];
- buf_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
- length = _dict_offsets[idx + 1] - idx_cursor - 4;
+ StringRef& slice = _dict_items[_indexes[i]];
+ buf_start = const_cast<char*>(slice.data);
+ length = (uint32_t)slice.size;
} else {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org