You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/12 08:52:37 UTC
[doris] branch master updated: [feature-wip](parquet-reader) fix string test and support decimal64 (#13184)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4fc7a048d2 [feature-wip](parquet-reader) fix string test and support decimal64 (#13184)
4fc7a048d2 is described below
commit 4fc7a048d210ac1396b92ea70f962e989cb95c97
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Wed Oct 12 16:52:28 2022 +0800
[feature-wip](parquet-reader) fix string test and support decimal64 (#13184)
1. Refactor arguments list of parquet min max filter, pass parquet type for min max value parsing
2. Fix the filter of string min max
Co-authored-by: jinzhe <ji...@selectdb.com>
---
be/src/vec/exec/format/parquet/parquet_pred_cmp.h | 391 +++++++++++++--------
.../exec/format/parquet/vparquet_page_index.cpp | 3 +-
.../vec/exec/format/parquet/vparquet_page_index.h | 1 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 13 +-
4 files changed, 259 insertions(+), 149 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
index 89792eaec1..5f33880c2a 100644
--- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
+++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
@@ -38,26 +38,6 @@ namespace doris::vectorized {
return true; \
}
-#define _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) \
- if (max <= conjunct_value) { \
- return true; \
- }
-
-#define _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) \
- if (max < conjunct_value) { \
- return true; \
- }
-
-#define _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) \
- if (min >= conjunct_value) { \
- return true; \
- }
-
-#define _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) \
- if (min > conjunct_value) { \
- return true; \
- }
-
#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
std::vector<T> in_values; \
for (auto val : in_pred_values) { \
@@ -76,32 +56,80 @@ namespace doris::vectorized {
return true; \
}
-static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
- const char* min_bytes, const char* max_bytes) {
- switch (conjunct_type) {
+struct ColumnMinMaxParams {
+ PrimitiveType conjunct_type;
+ tparquet::Type::type parquet_type;
+ void* value;
+ // Use for decimal type
+ int32_t parquet_precision;
+ int32_t parquet_scale;
+ int32_t parquet_type_length;
+ // Use for in predicate
+ std::vector<void*> in_pred_values;
+ const char* min_bytes;
+ const char* max_bytes;
+};
+
+template <typename T>
+static void _align_decimal_v2_scale(T* conjunct_value, int32_t value_scale, T* parquet_value,
+ int32_t parquet_scale) {
+ if (value_scale > parquet_scale) {
+ *parquet_value = *parquet_value * common::exp10_i32(value_scale - parquet_scale);
+ } else if (value_scale < parquet_scale) {
+ *conjunct_value = *conjunct_value * common::exp10_i32(parquet_scale - value_scale);
+ }
+}
+
+template <typename T>
+static void _decode_decimal_v2_to_primary(const ColumnMinMaxParams& params,
+ const char* raw_parquet_val, T* out_value,
+ T* parquet_val) {
+ *parquet_val = reinterpret_cast<const T*>(raw_parquet_val)[0];
+ DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
+ *out_value = conjunct_value.value();
+ _align_decimal_v2_scale(out_value, conjunct_value.scale(), parquet_val, params.parquet_scale);
+}
+
+// todo: support decimal128 after the test passes
+//static Int128 _decode_value_to_int128(const ColumnMinMaxParams& params,
+// const char* raw_parquet_val) {
+// const uint8_t* buf = reinterpret_cast<const uint8_t*>(raw_parquet_val);
+// int32_t length = params.parquet_type_length;
+// Int128 value = buf[0] & 0x80 ? -1 : 0;
+// memcpy(reinterpret_cast<uint8_t*>(&value) + sizeof(value) - length, buf, length);
+// return BigEndian::ToHost128(value);
+//}
+
+static bool _eval_in_val(const ColumnMinMaxParams& params) {
+ switch (params.conjunct_type) {
case TYPE_TINYINT: {
- _FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes)
+ _FILTER_GROUP_BY_IN(int8_t, params.in_pred_values, params.min_bytes, params.max_bytes)
break;
}
case TYPE_SMALLINT: {
- _FILTER_GROUP_BY_IN(int16_t, in_pred_values, min_bytes, max_bytes)
+ _FILTER_GROUP_BY_IN(int16_t, params.in_pred_values, params.min_bytes, params.max_bytes)
break;
}
+ case TYPE_DECIMAL32:
case TYPE_INT: {
- _FILTER_GROUP_BY_IN(int32_t, in_pred_values, min_bytes, max_bytes)
+ _FILTER_GROUP_BY_IN(int32_t, params.in_pred_values, params.min_bytes, params.max_bytes)
break;
}
+ case TYPE_DECIMAL64:
case TYPE_BIGINT: {
- _FILTER_GROUP_BY_IN(int64_t, in_pred_values, min_bytes, max_bytes)
+ _FILTER_GROUP_BY_IN(int64_t, params.in_pred_values, params.min_bytes, params.max_bytes)
+ break;
+ }
+ case TYPE_DECIMALV2: {
break;
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
std::vector<const char*> in_values;
- for (auto val : in_pred_values) {
- const char* value = ((std::string*)val)->data();
- in_values.emplace_back(value);
+ for (auto val : params.in_pred_values) {
+ std::string value = ((StringValue*)val)->to_string();
+ in_values.emplace_back(value.data());
}
if (in_values.empty()) {
return false;
@@ -109,7 +137,7 @@ static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred
auto result = std::minmax_element(in_values.begin(), in_values.end());
const char* in_min = *result.first;
const char* in_max = *result.second;
- if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) {
+ if (strcmp(in_max, params.min_bytes) < 0 || strcmp(in_min, params.max_bytes) > 0) {
return true;
}
break;
@@ -120,34 +148,77 @@ static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred
return false;
}
-static bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
- const char* max_bytes) {
- switch (conjunct_type) {
+static bool _eval_eq(const ColumnMinMaxParams& params) {
+ switch (params.conjunct_type) {
case TYPE_TINYINT: {
- _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
+ min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
case TYPE_SMALLINT: {
- _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
+ min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
+ case TYPE_DECIMAL32:
case TYPE_INT: {
- _PLAIN_DECODE(int32_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _PLAIN_DECODE(int32_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
+ min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
+ case TYPE_DECIMAL64:
case TYPE_BIGINT: {
- _PLAIN_DECODE(int64_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _PLAIN_DECODE(int64_t, params.value, params.min_bytes, params.max_bytes, conjunct_value,
+ min, max)
_FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
break;
}
+ case TYPE_DECIMALV2: {
+ if (params.parquet_type == tparquet::Type::INT32) {
+ int32_t min_value = reinterpret_cast<const int32_t*>(params.min_bytes)[0];
+ int32_t max_value = reinterpret_cast<const int32_t*>(params.max_bytes)[0];
+ DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
+ int32_t conjunct_int_value = conjunct_value.value();
+ _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value,
+ params.parquet_scale);
+ _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value,
+ params.parquet_scale);
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
+ } else if (params.parquet_type == tparquet::Type::INT64) {
+ int64_t min_value = reinterpret_cast<const int64_t*>(params.min_bytes)[0];
+ int64_t max_value = reinterpret_cast<const int64_t*>(params.max_bytes)[0];
+ DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
+ int64_t conjunct_int_value = conjunct_value.value();
+ _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value,
+ params.parquet_scale);
+ _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value,
+ params.parquet_scale);
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
+ }
+ break;
+ // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
+ // todo: support decimal128 after the test passes
+ // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
+ // Int128 conjunct_int_value = conjunct_value.value();
+ // Int128 max = _decode_value_to_int128(params, params.max_bytes);
+ // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max,
+ // params.parquet_scale);
+ // Int128 min = _decode_value_to_int128(params, params.min_bytes);
+ // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min,
+ // params.parquet_scale);
+ // _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min, max)
+ // }
+ }
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
- const char* conjunct_value = ((std::string*)value)->data();
- if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) {
+ std::string conjunct_value = ((StringValue*)params.value)->to_string();
+ if (strcmp(conjunct_value.data(), params.min_bytes) < 0 ||
+ strcmp(conjunct_value.data(), params.max_bytes) > 0) {
return true;
}
break;
@@ -158,70 +229,73 @@ static bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_b
return false;
}
-static bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
- switch (conjunct_type) {
- case TYPE_TINYINT: {
- _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
- break;
- }
- case TYPE_SMALLINT: {
- _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
- break;
- }
- case TYPE_INT: {
- _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
- break;
- }
- case TYPE_BIGINT: {
- _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
- break;
- }
- case TYPE_STRING:
- case TYPE_VARCHAR:
- case TYPE_CHAR: {
- const char* conjunct_value = ((std::string*)value)->data();
- if (strcmp(max_bytes, conjunct_value) <= 0) {
+template <typename T>
+static bool _filter_group_by_gt_or_ge(T conjunct_value, T max, bool is_ge) {
+ if (!is_ge) {
+ if (max <= conjunct_value) {
+ return true;
+ }
+ } else {
+ if (max < conjunct_value) {
return true;
}
- break;
- }
- default:
- return false;
}
return false;
}
-static bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
- switch (conjunct_type) {
+static bool _eval_gt(const ColumnMinMaxParams& params, bool is_eq) {
+ switch (params.conjunct_type) {
case TYPE_TINYINT: {
- _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
- break;
+ _PLAIN_DECODE_SINGLE(int8_t, params.value, params.max_bytes, conjunct_value, max)
+ return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
}
case TYPE_SMALLINT: {
- _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
- break;
+ _PLAIN_DECODE_SINGLE(int16_t, params.value, params.max_bytes, conjunct_value, max)
+ return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
}
+ case TYPE_DECIMAL32:
case TYPE_INT: {
- _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
- break;
+ _PLAIN_DECODE_SINGLE(int32_t, params.value, params.max_bytes, conjunct_value, max)
+ return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
}
+ case TYPE_DECIMAL64:
case TYPE_BIGINT: {
- _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
- _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+ _PLAIN_DECODE_SINGLE(int64_t, params.value, params.max_bytes, conjunct_value, max)
+ return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
+ }
+ case TYPE_DECIMALV2: {
+ if (params.parquet_type == tparquet::Type::INT32) {
+ int32_t conjunct_int_value = 0;
+ int32_t parquet_value = 0;
+ _decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value,
+ &parquet_value);
+ return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq);
+ } else if (params.parquet_type == tparquet::Type::INT64) {
+ int64_t conjunct_int_value = 0;
+ int64_t parquet_value = 0;
+ _decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value,
+ &parquet_value);
+ return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq);
+ }
break;
+ // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
+ // todo: support decimal128 after the test passes
+ // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
+ // Int128 conjunct_int_value = conjunct_value.value();
+ // Int128 max = _decode_value_to_int128(params, params.max_bytes);
+ // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max,
+ // params.parquet_scale);
+ // return _filter_group_by_gt_or_ge(conjunct_int_value, max, is_eq);
+ // }
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
- const char* conjunct_value = ((std::string*)value)->data();
- if (strcmp(max_bytes, conjunct_value) < 0) {
+ std::string conjunct_value = ((StringValue*)params.value)->to_string();
+ if (!is_eq && strcmp(params.max_bytes, conjunct_value.data()) <= 0) {
+ return true;
+ } else if (strcmp(params.max_bytes, conjunct_value.data()) < 0) {
return true;
}
break;
@@ -232,74 +306,82 @@ static bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_b
return false;
}
-static bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
- switch (conjunct_type) {
- case TYPE_TINYINT: {
- _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
- break;
- }
- case TYPE_SMALLINT: {
- _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
- break;
- }
- case TYPE_INT: {
- _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
- break;
- }
- case TYPE_BIGINT: {
- _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
- break;
- }
- case TYPE_STRING:
- case TYPE_VARCHAR:
- case TYPE_CHAR: {
- const char* conjunct_value = ((std::string*)value)->data();
- if (strcmp(min_bytes, conjunct_value) >= 0) {
+template <typename T>
+static bool _filter_group_by_lt_or_le(T conjunct_value, T min, bool is_le) {
+ if (!is_le) {
+ if (min >= conjunct_value) {
+ return true;
+ }
+ } else {
+ if (min > conjunct_value) {
return true;
}
- break;
- }
- default:
- return false;
}
return false;
}
-static bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
- switch (conjunct_type) {
+static bool _eval_lt(const ColumnMinMaxParams& params, bool is_eq) {
+ switch (params.conjunct_type) {
case TYPE_TINYINT: {
- _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
- break;
+ _PLAIN_DECODE_SINGLE(int8_t, params.value, params.min_bytes, conjunct_value, min)
+ return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
case TYPE_SMALLINT: {
- _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
- break;
+ _PLAIN_DECODE_SINGLE(int16_t, params.value, params.min_bytes, conjunct_value, min)
+ return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
+ case TYPE_DECIMAL32:
case TYPE_INT: {
- _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
- break;
+ _PLAIN_DECODE_SINGLE(int32_t, params.value, params.min_bytes, conjunct_value, min)
+ return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
+ case TYPE_DECIMAL64:
case TYPE_BIGINT: {
- _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
- _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
- break;
+ _PLAIN_DECODE_SINGLE(int64_t, params.value, params.min_bytes, conjunct_value, min)
+ return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR: {
- const char* conjunct_value = ((std::string*)value)->data();
- if (strcmp(min_bytes, conjunct_value) > 0) {
+ std::string conjunct_value = ((StringValue*)params.value)->to_string();
+ if (!is_eq && strcmp(params.min_bytes, conjunct_value.data()) >= 0) {
+ return true;
+ } else if (strcmp(params.min_bytes, conjunct_value.data()) > 0) {
return true;
}
break;
}
+ case TYPE_DECIMALV2: {
+ if (params.parquet_type == tparquet::Type::INT32) {
+ int32_t conjunct_int_value = 0;
+ int32_t parquet_value = 0;
+ _decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value,
+ &parquet_value);
+ return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq);
+ } else if (params.parquet_type == tparquet::Type::INT64) {
+ int64_t conjunct_int_value = 0;
+ int64_t parquet_value = 0;
+ _decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value,
+ &parquet_value);
+ return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq);
+ }
+ break;
+ // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
+ // todo: support decimal128 after the test passes
+ // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
+ // Int128 conjunct_int_value = conjunct_value.value();
+ // Int128 min = _decode_value_to_int128(params, params.min_bytes);
+ // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min,
+ // params.parquet_scale);
+ // return _filter_group_by_lt_or_le(conjunct_int_value, min, is_eq);
+ // }
+ }
+ case TYPE_DATE: {
+ // doris::DateTimeValue* min_date = (doris::DateTimeValue*)params.value;
+ // LOG(INFO) << min_date->debug_string();
+ return false;
+ }
default:
return false;
}
@@ -386,42 +468,48 @@ static void to_filter(const ColumnValueRange<primitive_type>& col_val_range,
}
}
-static void _eval_predicate(ScanPredicate filter, PrimitiveType col_type, const char* min_bytes,
- const char* max_bytes, bool& need_filter) {
+static void _eval_predicate(const ScanPredicate& filter, ColumnMinMaxParams* params,
+ bool* need_filter) {
if (filter._values.empty()) {
return;
}
if (filter._op == TExprOpcode::FILTER_NEW_IN) {
- need_filter = _eval_in_val(col_type, filter._values, min_bytes, max_bytes);
+ if (filter._values.size() == 1) {
+ params->value = filter._values[0];
+ *need_filter = _eval_eq(*params);
+ return;
+ }
+ params->in_pred_values = filter._values;
+ *need_filter = _eval_in_val(*params);
return;
}
// preserve TExprOpcode::FILTER_NEW_NOT_IN
- auto& value = filter._values[0];
+ params->value = filter._values[0];
switch (filter._op) {
case TExprOpcode::EQ:
- need_filter = _eval_eq(col_type, value, min_bytes, max_bytes);
+ *need_filter = _eval_eq(*params);
break;
case TExprOpcode::NE:
break;
case TExprOpcode::GT:
- need_filter = _eval_gt(col_type, value, max_bytes);
+ *need_filter = _eval_gt(*params, false);
break;
case TExprOpcode::GE:
- need_filter = _eval_ge(col_type, value, max_bytes);
+ *need_filter = _eval_gt(*params, true);
break;
case TExprOpcode::LT:
- need_filter = _eval_lt(col_type, value, min_bytes);
+ *need_filter = _eval_lt(*params, false);
break;
case TExprOpcode::LE:
- need_filter = _eval_le(col_type, value, min_bytes);
+ *need_filter = _eval_lt(*params, true);
break;
default:
break;
}
}
-static bool determine_filter_min_max(ColumnValueRangeType& col_val_range,
- const std::string& encoded_min,
+static bool determine_filter_min_max(const ColumnValueRangeType& col_val_range,
+ const FieldSchema* col_schema, const std::string& encoded_min,
const std::string& encoded_max) {
const char* min_bytes = encoded_min.data();
const char* max_bytes = encoded_max.data();
@@ -434,10 +522,21 @@ static bool determine_filter_min_max(ColumnValueRangeType& col_val_range,
to_filter(range, filters);
},
col_val_range);
+ if (filters.empty()) {
+ return false;
+ }
+
+ ColumnMinMaxParams params;
+ params.conjunct_type = col_type;
+ params.parquet_type = col_schema->physical_type;
+ params.parquet_precision = col_schema->parquet_schema.precision;
+ params.parquet_scale = col_schema->parquet_schema.scale;
+ params.parquet_type_length = col_schema->parquet_schema.type_length;
+ params.min_bytes = min_bytes;
+ params.max_bytes = max_bytes;
for (int i = 0; i < filters.size(); i++) {
- ScanPredicate filter = filters[i];
- _eval_predicate(filter, col_type, min_bytes, max_bytes, need_filter);
+ _eval_predicate(filters[i], ¶ms, &need_filter);
if (need_filter) {
break;
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
index acc076ff7c..a5673833a8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
@@ -39,6 +39,7 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index,
ColumnValueRangeType& col_val_range,
+ const FieldSchema* col_schema,
std::vector<int>& skipped_ranges) {
const std::vector<std::string>& encoded_min_vals = column_index->min_values;
const std::vector<std::string>& encoded_max_vals = column_index->max_values;
@@ -46,7 +47,7 @@ Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index
const int num_of_pages = column_index->null_pages.size();
for (int page_id = 0; page_id < num_of_pages; page_id++) {
- if (determine_filter_min_max(col_val_range, encoded_min_vals[page_id],
+ if (determine_filter_min_max(col_val_range, col_schema, encoded_min_vals[page_id],
encoded_max_vals[page_id])) {
skipped_ranges.emplace_back(page_id);
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index cfbe97ded4..c5f8183e35 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -32,6 +32,7 @@ public:
int page_idx, RowRange* row_range);
Status collect_skipped_page_range(tparquet::ColumnIndex* column_index,
ColumnValueRangeType& col_val_range,
+ const FieldSchema* col_schema,
std::vector<int>& skipped_ranges);
bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns);
Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index c8ea5da343..8d5042af09 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -312,6 +312,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
RETURN_IF_ERROR(
_file_reader->readat(page_index._column_index_start, buffer_size, &bytes_read, buff));
+ auto& schema_desc = _file_metadata->schema();
std::vector<RowRange> skipped_row_ranges;
for (auto& read_col : _read_columns) {
auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name);
@@ -320,6 +321,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
}
auto& chunk = row_group.columns[read_col._parquet_col_id];
tparquet::ColumnIndex column_index;
+ if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
+ return Status::OK();
+ }
RETURN_IF_ERROR(page_index.parse_column_index(chunk, buff, &column_index));
const int num_of_pages = column_index.null_pages.size();
if (num_of_pages <= 0) {
@@ -327,7 +331,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
}
auto& conjuncts = conjunct_iter->second;
std::vector<int> skipped_page_range;
- page_index.collect_skipped_page_range(&column_index, conjuncts, skipped_page_range);
+ const FieldSchema* col_schema = schema_desc.get_column(read_col._file_slot_name);
+ page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema,
+ skipped_page_range);
if (skipped_page_range.empty()) {
return Status::OK();
}
@@ -387,6 +393,7 @@ Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_gr
Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& columns,
bool* filter_group) {
+ auto& schema_desc = _file_metadata->schema();
for (auto& col_name : _column_names) {
auto col_iter = _map_column.find(col_name);
if (col_iter == _map_column.end()) {
@@ -401,8 +408,10 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co
if (!statistic.__isset.max || !statistic.__isset.min) {
continue;
}
+ const FieldSchema* col_schema = schema_desc.get_column(col_name);
// Min-max of statistic is plain-encoded value
- *filter_group = determine_filter_min_max(slot_iter->second, statistic.min, statistic.max);
+ *filter_group = determine_filter_min_max(slot_iter->second, col_schema, statistic.min,
+ statistic.max);
if (*filter_group) {
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org