You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/09 04:11:52 UTC
[incubator-doris] branch master updated: [feature-wip](array-type) Add array type support for vectorized parquet-orc scanner (#9856)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 19bc14cf8d [feature-wip](array-type) Add array type support for vectorized parquet-orc scanner (#9856)
19bc14cf8d is described below
commit 19bc14cf8d96bd7990c0ad52d9356cf7e686a441
Author: yinzhijian <37...@qq.com>
AuthorDate: Thu Jun 9 12:11:47 2022 +0800
[feature-wip](array-type) Add array type support for vectorized parquet-orc scanner (#9856)
Only support one level array now.
for example:
- nullable(array(nullable(tinyint))) is **support**.
- nullable(array(nullable(array(xx))) is **not support**.
---
be/src/exec/arrow/orc_reader.h | 4 +-
be/src/exec/base_scanner.cpp | 1 +
be/src/vec/data_types/data_type.cpp | 4 +
be/src/vec/data_types/data_type.h | 2 +
be/src/vec/data_types/data_type_array.cpp | 91 ++++++++++++++-
be/src/vec/data_types/data_type_array.h | 4 +
be/src/vec/data_types/data_type_date.cpp | 12 ++
be/src/vec/data_types/data_type_date.h | 1 +
be/src/vec/data_types/data_type_date_time.cpp | 12 ++
be/src/vec/data_types/data_type_date_time.h | 2 +
be/src/vec/data_types/data_type_decimal.cpp | 12 ++
be/src/vec/data_types/data_type_decimal.h | 1 +
be/src/vec/data_types/data_type_factory.cpp | 5 +
be/src/vec/data_types/data_type_factory.hpp | 43 ++++---
be/src/vec/data_types/data_type_nullable.cpp | 30 +++++
be/src/vec/data_types/data_type_nullable.h | 2 +
be/src/vec/data_types/data_type_number_base.cpp | 28 +++++
be/src/vec/data_types/data_type_number_base.h | 1 +
be/src/vec/data_types/data_type_string.cpp | 6 +
be/src/vec/data_types/data_type_string.h | 1 +
be/src/vec/exec/varrow_scanner.cpp | 2 +-
be/src/vec/functions/function_cast.h | 110 +++++++++++++++++-
be/src/vec/utils/arrow_column_to_doris_column.cpp | 47 +++++++-
be/src/vec/utils/arrow_column_to_doris_column.h | 2 +-
.../utils/arrow_column_to_doris_column_test.cpp | 125 ++++++++++++++++++++-
25 files changed, 511 insertions(+), 37 deletions(-)
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index 5213a18dcf..dd7853efe7 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -29,7 +29,7 @@
#include "exec/arrow/arrow_reader.h"
namespace doris {
-// Reader of orc file
+// Reader of ORC file
class ORCReaderWrap final : public ArrowReaderWrap {
public:
ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
@@ -48,4 +48,4 @@ private:
bool _cur_file_eof; // is read over?
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 005e64c703..11247a860c 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -329,6 +329,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
auto column_ptr = _src_block.get_by_position(result_column_id).column;
+ DCHECK(column_ptr != nullptr);
// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
diff --git a/be/src/vec/data_types/data_type.cpp b/be/src/vec/data_types/data_type.cpp
index 0cffc102ff..414959a517 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -82,6 +82,10 @@ std::string IDataType::to_string(const IColumn& column, size_t row_num) const {
LOG(FATAL) << fmt::format("Data type {} to_string not implement.", get_name());
return "";
}
+Status IDataType::from_string(ReadBuffer& rb, IColumn* column) const {
+ LOG(FATAL) << fmt::format("Data type {} from_string not implement.", get_name());
+ return Status::OK();
+}
void IDataType::insert_default_into(IColumn& column) const {
column.insert_default();
diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h
index 5e49fa90c6..6a4dc16899 100644
--- a/be/src/vec/data_types/data_type.h
+++ b/be/src/vec/data_types/data_type.h
@@ -28,6 +28,7 @@
#include "vec/common/cow.h"
#include "vec/common/string_buffer.hpp"
#include "vec/core/types.h"
+#include "vec/io/reader_buffer.h"
namespace doris {
class PBlock;
@@ -70,6 +71,7 @@ public:
virtual void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const;
virtual std::string to_string(const IColumn& column, size_t row_num) const;
+ virtual Status from_string(ReadBuffer& rb, IColumn* column) const;
protected:
virtual String do_get_name() const;
diff --git a/be/src/vec/data_types/data_type_array.cpp b/be/src/vec/data_types/data_type_array.cpp
index 1f30b48aae..cc67eb7973 100644
--- a/be/src/vec/data_types/data_type_array.cpp
+++ b/be/src/vec/data_types/data_type_array.cpp
@@ -21,6 +21,7 @@
#include "vec/data_types/data_type_array.h"
#include "gen_cpp/data.pb.h"
+#include "vec/common/string_utils/string_utils.h"
#include "vec/io/io_helper.h"
namespace doris::vectorized {
@@ -94,4 +95,92 @@ void DataTypeArray::to_pb_column_meta(PColumnMeta* col_meta) const {
get_nested_type()->to_pb_column_meta(children);
}
-} // namespace doris::vectorized
+void DataTypeArray::to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const {
+ auto& data_column =
+ assert_cast<const ColumnArray&>(*column.convert_to_full_column_if_const().get());
+ auto& offsets = data_column.get_offsets();
+
+ size_t offset = offsets[row_num - 1];
+ size_t next_offset = offsets[row_num];
+
+ const IColumn& nested_column = data_column.get_data();
+ ostr.write("[", 1);
+ for (size_t i = offset; i < next_offset; ++i) {
+ if (i != offset) {
+ ostr.write(",", 1);
+ }
+ nested->to_string(nested_column, i, ostr);
+ }
+ ostr.write("]", 1);
+}
+
+std::string DataTypeArray::to_string(const IColumn& column, size_t row_num) const {
+ auto& data_column =
+ assert_cast<const ColumnArray&>(*column.convert_to_full_column_if_const().get());
+ auto& offsets = data_column.get_offsets();
+
+ size_t offset = offsets[row_num - 1];
+ size_t next_offset = offsets[row_num];
+ const IColumn& nested_column = data_column.get_data();
+ std::stringstream ss;
+ ss << "[";
+ for (size_t i = offset; i < next_offset; ++i) {
+ if (i != offset) {
+ ss << ",";
+ }
+ ss << nested->to_string(nested_column, i);
+ }
+ ss << "]";
+ return ss.str();
+}
+
+Status DataTypeArray::from_string(ReadBuffer& rb, IColumn* column) const {
+ // only support one level now
+ auto* array_column = assert_cast<ColumnArray*>(column);
+ auto& offsets = array_column->get_offsets();
+
+ IColumn& nested_column = array_column->get_data();
+ if (*rb.position() != '[') {
+ return Status::InvalidArgument("Array does not start with '[' character, found '{}'",
+ *rb.position());
+ }
+ ++rb.position();
+ bool first = true;
+ size_t size = 0;
+ while (!rb.eof() && *rb.position() != ']') {
+ if (!first) {
+ if (*rb.position() == ',') {
+ ++rb.position();
+ } else {
+ return Status::InvalidArgument(fmt::format(
+ "Cannot read array from text, expected comma or end of array, found '{}'",
+ *rb.position()));
+ }
+ }
+ first = false;
+ if (*rb.position() == ']') {
+ break;
+ }
+ size_t nested_str_len = 1;
+ char* temp_char = rb.position() + nested_str_len;
+ while (*(temp_char) != ']' && *(temp_char) != ',' && temp_char != rb.end()) {
+ ++nested_str_len;
+ temp_char = rb.position() + nested_str_len;
+ }
+
+ ReadBuffer read_buffer(rb.position(), nested_str_len);
+ auto st = nested->from_string(read_buffer, &nested_column);
+ if (!st.ok()) {
+ // we should do revert if error
+ array_column->pop_back(size);
+ return st;
+ }
+ rb.position() += nested_str_len;
+ DCHECK_LE(rb.position(), rb.end());
+ ++size;
+ }
+ offsets.push_back(offsets.back() + size);
+ return Status::OK();
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/data_types/data_type_array.h b/be/src/vec/data_types/data_type_array.h
index 2cd9c45667..c87298b1f0 100644
--- a/be/src/vec/data_types/data_type_array.h
+++ b/be/src/vec/data_types/data_type_array.h
@@ -78,6 +78,10 @@ public:
const char* deserialize(const char* buf, IColumn* column) const override;
void to_pb_column_meta(PColumnMeta* col_meta) const override;
+
+ std::string to_string(const IColumn& column, size_t row_num) const override;
+ void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
+ Status from_string(ReadBuffer& rb, IColumn* column) const override;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_date.cpp b/be/src/vec/data_types/data_type_date.cpp
index 2508570a53..3e261d0f99 100644
--- a/be/src/vec/data_types/data_type_date.cpp
+++ b/be/src/vec/data_types/data_type_date.cpp
@@ -23,6 +23,7 @@
#include "runtime/datetime_value.h"
#include "util/binary_cast.hpp"
#include "vec/columns/columns_number.h"
+#include "vec/io/io_helper.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris::vectorized {
bool DataTypeDate::equals(const IDataType& rhs) const {
@@ -59,6 +60,17 @@ void DataTypeDate::to_string(const IColumn& column, size_t row_num, BufferWritab
ostr.write(buf, pos - buf - 1);
}
+Status DataTypeDate::from_string(ReadBuffer& rb, IColumn* column) const {
+ auto* column_data = static_cast<ColumnInt64*>(column);
+ Int64 val = 0;
+ if (!read_date_text_impl<Int64>(val, rb)) {
+ return Status::InvalidArgument(fmt::format("parse date fail, string: '{}'",
+ std::string(rb.position(), rb.count()).c_str()));
+ }
+ column_data->insert_value(val);
+ return Status::OK();
+}
+
void DataTypeDate::cast_to_date(Int64& x) {
auto value = binary_cast<Int64, VecDateTimeValue>(x);
value.cast_to_date();
diff --git a/be/src/vec/data_types/data_type_date.h b/be/src/vec/data_types/data_type_date.h
index 06f8ff0d0e..98899db38b 100644
--- a/be/src/vec/data_types/data_type_date.h
+++ b/be/src/vec/data_types/data_type_date.h
@@ -36,6 +36,7 @@ public:
bool equals(const IDataType& rhs) const override;
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
+ Status from_string(ReadBuffer& rb, IColumn* column) const override;
static void cast_to_date(Int64& x);
diff --git a/be/src/vec/data_types/data_type_date_time.cpp b/be/src/vec/data_types/data_type_date_time.cpp
index 5271091f87..b758390576 100644
--- a/be/src/vec/data_types/data_type_date_time.cpp
+++ b/be/src/vec/data_types/data_type_date_time.cpp
@@ -23,6 +23,7 @@
#include "runtime/datetime_value.h"
#include "util/binary_cast.hpp"
#include "vec/columns/columns_number.h"
+#include "vec/io/io_helper.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris::vectorized {
@@ -82,6 +83,17 @@ void DataTypeDateTime::to_string(const IColumn& column, size_t row_num,
ostr.write(buf, pos - buf - 1);
}
+Status DataTypeDateTime::from_string(ReadBuffer& rb, IColumn* column) const {
+ auto* column_data = static_cast<ColumnInt64*>(column);
+ Int64 val = 0;
+ if (!read_datetime_text_impl<Int64>(val, rb)) {
+ return Status::InvalidArgument(fmt::format("parse datetime fail, string: '{}'",
+ std::string(rb.position(), rb.count()).c_str()));
+ }
+ column_data->insert_value(val);
+ return Status::OK();
+}
+
void DataTypeDateTime::cast_to_date_time(Int64& x) {
auto value = binary_cast<Int64, doris::vectorized::VecDateTimeValue>(x);
value.to_datetime();
diff --git a/be/src/vec/data_types/data_type_date_time.h b/be/src/vec/data_types/data_type_date_time.h
index adf41bfc8b..8085407253 100644
--- a/be/src/vec/data_types/data_type_date_time.h
+++ b/be/src/vec/data_types/data_type_date_time.h
@@ -64,6 +64,8 @@ public:
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
+ Status from_string(ReadBuffer& rb, IColumn* column) const override;
+
static void cast_to_date_time(Int64& x);
MutableColumnPtr create_column() const override;
diff --git a/be/src/vec/data_types/data_type_decimal.cpp b/be/src/vec/data_types/data_type_decimal.cpp
index 8cd97a38de..04b8713da9 100644
--- a/be/src/vec/data_types/data_type_decimal.cpp
+++ b/be/src/vec/data_types/data_type_decimal.cpp
@@ -62,6 +62,18 @@ void DataTypeDecimal<T>::to_string(const IColumn& column, size_t row_num,
ostr.write(str.data(), str.size());
}
+template <typename T>
+Status DataTypeDecimal<T>::from_string(ReadBuffer& rb, IColumn* column) const {
+ auto& column_data = static_cast<ColumnType&>(*column).get_data();
+ T val = 0;
+ if (!read_decimal_text_impl<T>(val, rb)) {
+ return Status::InvalidArgument(fmt::format("parse decimal fail, string: '{}'",
+ std::string(rb.position(), rb.count()).c_str()));
+ }
+ column_data.emplace_back(val);
+ return Status::OK();
+}
+
// binary: row_num | value1 | value2 | ...
template <typename T>
int64_t DataTypeDecimal<T>::get_uncompressed_serialized_bytes(const IColumn& column) const {
diff --git a/be/src/vec/data_types/data_type_decimal.h b/be/src/vec/data_types/data_type_decimal.h
index 0613653324..7a2d9541e6 100644
--- a/be/src/vec/data_types/data_type_decimal.h
+++ b/be/src/vec/data_types/data_type_decimal.h
@@ -145,6 +145,7 @@ public:
bool can_be_inside_nullable() const override { return true; }
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
+ Status from_string(ReadBuffer& rb, IColumn* column) const override;
/// Decimal specific
diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp
index 2071b8cedb..3658270ec7 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -312,6 +312,11 @@ DataTypePtr DataTypeFactory::create_data_type(const arrow::DataType* type, bool
case ::arrow::Type::DECIMAL:
nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>();
break;
+ case ::arrow::Type::LIST:
+ DCHECK(type->num_fields() == 1);
+ nested = std::make_shared<vectorized::DataTypeArray>(
+ create_data_type(type->field(0)->type().get(), true));
+ break;
default:
DCHECK(false) << "invalid arrow type:" << (int)(type->id());
break;
diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp
index 968617414c..e64bdcf3ac 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -49,22 +49,33 @@ public:
static std::once_flag oc;
static DataTypeFactory instance;
std::call_once(oc, []() {
- instance.register_data_type("UInt8", std::make_shared<DataTypeUInt8>());
- instance.register_data_type("UInt16", std::make_shared<DataTypeUInt16>());
- instance.register_data_type("UInt32", std::make_shared<DataTypeUInt32>());
- instance.register_data_type("UInt64", std::make_shared<DataTypeUInt64>());
- instance.register_data_type("Int8", std::make_shared<DataTypeInt8>());
- instance.register_data_type("Int16", std::make_shared<DataTypeInt16>());
- instance.register_data_type("Int32", std::make_shared<DataTypeInt32>());
- instance.register_data_type("Int64", std::make_shared<DataTypeInt64>());
- instance.register_data_type("Int128", std::make_shared<DataTypeInt128>());
- instance.register_data_type("Float32", std::make_shared<DataTypeFloat32>());
- instance.register_data_type("Float64", std::make_shared<DataTypeFloat64>());
- instance.register_data_type("Date", std::make_shared<DataTypeDate>());
- instance.register_data_type("DateTime", std::make_shared<DataTypeDateTime>());
- instance.register_data_type("String", std::make_shared<DataTypeString>());
- instance.register_data_type("Decimal",
- std::make_shared<DataTypeDecimal<Decimal128>>(27, 9));
+ std::unordered_map<std::string, DataTypePtr> base_type_map {
+ {"UInt8", std::make_shared<DataTypeUInt8>()},
+ {"UInt16", std::make_shared<DataTypeUInt16>()},
+ {"UInt32", std::make_shared<DataTypeUInt32>()},
+ {"UInt64", std::make_shared<DataTypeUInt64>()},
+ {"Int8", std::make_shared<DataTypeInt8>()},
+ {"Int16", std::make_shared<DataTypeInt16>()},
+ {"Int32", std::make_shared<DataTypeInt32>()},
+ {"Int64", std::make_shared<DataTypeInt64>()},
+ {"Int128", std::make_shared<DataTypeInt128>()},
+ {"Float32", std::make_shared<DataTypeFloat32>()},
+ {"Float64", std::make_shared<DataTypeFloat64>()},
+ {"Date", std::make_shared<DataTypeDate>()},
+ {"DateTime", std::make_shared<DataTypeDateTime>()},
+ {"String", std::make_shared<DataTypeString>()},
+ {"Decimal", std::make_shared<DataTypeDecimal<Decimal128>>(27, 9)},
+
+ };
+ for (auto const& [key, val] : base_type_map) {
+ instance.register_data_type(key, val);
+ instance.register_data_type("Array(" + key + ")",
+ std::make_shared<vectorized::DataTypeArray>(val));
+ instance.register_data_type(
+ "Array(Nullable(" + key + "))",
+ std::make_shared<vectorized::DataTypeArray>(
+ std::make_shared<vectorized::DataTypeNullable>(val)));
+ }
});
return instance;
}
diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp
index 9e63fa5e24..d8a3525bc3 100644
--- a/be/src/vec/data_types/data_type_nullable.cpp
+++ b/be/src/vec/data_types/data_type_nullable.cpp
@@ -53,6 +53,36 @@ std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) c
}
}
+void DataTypeNullable::to_string(const IColumn& column, size_t row_num,
+ BufferWritable& ostr) const {
+ const ColumnNullable& col =
+ assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const().get());
+
+ if (col.is_null_at(row_num)) {
+ ostr.write("NULL", 4);
+ } else {
+ nested_data_type->to_string(col.get_nested_column(), row_num, ostr);
+ }
+}
+
+Status DataTypeNullable::from_string(ReadBuffer& rb, IColumn* column) const {
+ auto* null_column = assert_cast<ColumnNullable*>(column);
+ if (rb.count() == 4 && *(rb.position()) == 'N' && *(rb.position() + 1) == 'U' &&
+ *(rb.position() + 2) == 'L' && *(rb.position() + 3) == 'L') {
+ null_column->insert_data(nullptr, 0);
+ return Status::OK();
+ }
+ auto st = nested_data_type->from_string(rb, &(null_column->get_nested_column()));
+ if (!st.ok()) {
+ // fill null if fail
+ null_column->insert_data(nullptr, 0); // 0 is meaningless here
+ return Status::OK();
+ }
+ // fill not null if succ
+ null_column->get_null_map_data().push_back(0);
+ return Status::OK();
+}
+
// binary: row num | <null array> | <values array>
// <null array>: is_null1 | is_null2 | ...
// <values array>: value1 | value2 | ...>
diff --git a/be/src/vec/data_types/data_type_nullable.h b/be/src/vec/data_types/data_type_nullable.h
index 537d0bf449..1f51fd1789 100644
--- a/be/src/vec/data_types/data_type_nullable.h
+++ b/be/src/vec/data_types/data_type_nullable.h
@@ -81,6 +81,8 @@ public:
return nested_data_type->can_be_inside_low_cardinality();
}
std::string to_string(const IColumn& column, size_t row_num) const override;
+ void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
+ Status from_string(ReadBuffer& rb, IColumn* column) const override;
const DataTypePtr& get_nested_type() const { return nested_data_type; }
diff --git a/be/src/vec/data_types/data_type_number_base.cpp b/be/src/vec/data_types/data_type_number_base.cpp
index 2acd319ee6..a6111412df 100644
--- a/be/src/vec/data_types/data_type_number_base.cpp
+++ b/be/src/vec/data_types/data_type_number_base.cpp
@@ -55,6 +55,34 @@ void DataTypeNumberBase<T>::to_string(const IColumn& column, size_t row_num,
}
}
+template <typename T>
+Status DataTypeNumberBase<T>::from_string(ReadBuffer& rb, IColumn* column) const {
+ auto* column_data = static_cast<ColumnVector<T>*>(column);
+ if constexpr (std::is_same<T, UInt128>::value) {
+ // TODO support for Uint128
+ return Status::InvalidArgument("uint128 is not support");
+ } else if constexpr (std::is_same_v<T, float> || std::is_same_v<T, double>) {
+ T val = 0;
+ if (!read_float_text_fast_impl(val, rb)) {
+ return Status::InvalidArgument(
+ fmt::format("parse number fail, string: '{}'",
+ std::string(rb.position(), rb.count()).c_str()));
+ }
+ column_data->insert_value(val);
+ } else if constexpr (std::is_integral<T>::value) {
+ T val = 0;
+ if (!read_int_text_impl(val, rb)) {
+ return Status::InvalidArgument(
+ fmt::format("parse number fail, string: '{}'",
+ std::string(rb.position(), rb.count()).c_str()));
+ }
+ column_data->insert_value(val);
+ } else {
+ DCHECK(false);
+ }
+ return Status::OK();
+}
+
template <typename T>
Field DataTypeNumberBase<T>::get_default() const {
return NearestFieldType<FieldType>();
diff --git a/be/src/vec/data_types/data_type_number_base.h b/be/src/vec/data_types/data_type_number_base.h
index 7b6266e4c7..14fbfb3d07 100644
--- a/be/src/vec/data_types/data_type_number_base.h
+++ b/be/src/vec/data_types/data_type_number_base.h
@@ -67,6 +67,7 @@ public:
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
std::string to_string(const IColumn& column, size_t row_num) const override;
+ Status from_string(ReadBuffer& rb, IColumn* column) const override;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_string.cpp b/be/src/vec/data_types/data_type_string.cpp
index fe0518ad38..220b418e4b 100644
--- a/be/src/vec/data_types/data_type_string.cpp
+++ b/be/src/vec/data_types/data_type_string.cpp
@@ -66,6 +66,12 @@ void DataTypeString::to_string(const class doris::vectorized::IColumn& column, s
ostr.write(s.data, s.size);
}
+Status DataTypeString::from_string(ReadBuffer& rb, IColumn* column) const {
+ auto* column_data = static_cast<ColumnString*>(column);
+ column_data->insert_data(rb.position(), rb.count());
+ return Status::OK();
+}
+
Field DataTypeString::get_default() const {
return String();
}
diff --git a/be/src/vec/data_types/data_type_string.h b/be/src/vec/data_types/data_type_string.h
index 8cf10fd9d4..1ecf86f2fe 100644
--- a/be/src/vec/data_types/data_type_string.h
+++ b/be/src/vec/data_types/data_type_string.h
@@ -58,6 +58,7 @@ public:
bool can_be_inside_low_cardinality() const override { return true; }
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
+ Status from_string(ReadBuffer& rb, IColumn* column) const override;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index 971f1b45bc..14ae373988 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -255,7 +255,7 @@ Status VArrowScanner::_append_batch_to_src_block(Block* block) {
auto* array = _batch->column(column_pos++).get();
auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
RETURN_IF_ERROR(arrow_column_to_doris_column(array, _arrow_batch_cur_idx,
- column_with_type_and_name, num_elements,
+ column_with_type_and_name.column, num_elements,
_state->timezone()));
}
diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h
index 84039b1cb3..5799e60d93 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -245,6 +245,40 @@ struct ConvertImplGenericToString {
}
};
+template <typename StringColumnType>
+struct ConvertImplGenericFromString {
+ static Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
+ const size_t result, size_t input_rows_count) {
+ static_assert(std::is_same_v<StringColumnType, ColumnString>,
+ "Can be used only to parse from ColumnString");
+ const auto& col_with_type_and_name = block.get_by_position(arguments[0]);
+ const IColumn& col_from = *col_with_type_and_name.column;
+ // result column must set type
+ DCHECK(block.get_by_position(result).type != nullptr);
+ auto data_type_to = block.get_by_position(result).type;
+ if (const StringColumnType* col_from_string =
+ check_and_get_column<StringColumnType>(&col_from)) {
+ auto col_to = data_type_to->create_column();
+
+ //IColumn & col_to = *res;
+ size_t size = col_from.size();
+ col_to->reserve(size);
+
+ for (size_t i = 0; i < size; ++i) {
+ const auto& val = col_from_string->get_data_at(i);
+ ReadBuffer read_buffer((char*)(val.data), val.size);
+ RETURN_IF_ERROR(data_type_to->from_string(read_buffer, col_to));
+ }
+ block.replace_by_position(result, std::move(col_to));
+ } else {
+ return Status::RuntimeError(fmt::format(
+ "Illegal column {} of first argument of conversion function from string",
+ col_from.get_name()));
+ }
+ return Status::OK();
+ }
+};
+
template <typename ToDataType, typename Name>
struct ConvertImpl<DataTypeString, ToDataType, Name> {
template <typename Additions = void*>
@@ -693,7 +727,6 @@ protected:
if (arguments.size() > 2)
new_arguments.insert(std::end(new_arguments), std::next(std::begin(arguments), 2),
std::end(arguments));
-
return wrapper_function(context, block, new_arguments, result, input_rows_count);
}
@@ -1040,6 +1073,70 @@ private:
};
}
+ WrapperType create_array_wrapper(const DataTypePtr& from_type_untyped,
+ const DataTypeArray& to_type) const {
+ /// Conversion from String through parsing.
+ if (check_and_get_data_type<DataTypeString>(from_type_untyped.get())) {
+ return &ConvertImplGenericFromString<ColumnString>::execute;
+ }
+
+ const auto* from_type = check_and_get_data_type<DataTypeArray>(from_type_untyped.get());
+
+ if (!from_type) {
+ LOG(FATAL) << "CAST AS Array can only be performed between same-dimensional Array, "
+ "String types";
+ }
+
+ DataTypePtr from_nested_type = from_type->get_nested_type();
+
+ /// In query SELECT CAST([] AS Array(Array(String))) from type is Array(Nothing)
+ bool from_empty_array = is_nothing(from_nested_type);
+
+ if (from_type->get_number_of_dimensions() != to_type.get_number_of_dimensions() &&
+ !from_empty_array) {
+ LOG(FATAL)
+ << "CAST AS Array can only be performed between same-dimensional array types";
+ }
+
+ const DataTypePtr& to_nested_type = to_type.get_nested_type();
+
+ /// Prepare nested type conversion
+ const auto nested_function = prepare_unpack_dictionaries(from_nested_type, to_nested_type);
+
+ return [nested_function, from_nested_type, to_nested_type](
+ FunctionContext* context, Block& block, const ColumnNumbers& arguments,
+ const size_t result, size_t /*input_rows_count*/) -> Status {
+ auto& from_column = block.get_by_position(arguments.front()).column;
+
+ const ColumnArray* from_col_array =
+ check_and_get_column<ColumnArray>(from_column.get());
+
+ if (from_col_array) {
+ /// create columns for converting nested column containing original and result columns
+ ColumnWithTypeAndName from_nested_column {from_col_array->get_data_ptr(),
+ from_nested_type, ""};
+
+ /// convert nested column
+ ColumnNumbers new_arguments {block.columns()};
+ block.insert(from_nested_column);
+
+ size_t nested_result = block.columns();
+ block.insert({to_nested_type, ""});
+ RETURN_IF_ERROR(nested_function(context, block, new_arguments, nested_result,
+ from_col_array->get_data_ptr()->size()));
+ auto nested_result_column = block.get_by_position(nested_result).column;
+
+ /// set converted nested column to result
+ block.get_by_position(result).column = ColumnArray::create(
+ nested_result_column, from_col_array->get_offsets_ptr());
+ } else {
+ return Status::RuntimeError(fmt::format(
+ "Illegal column {} for function CAST AS Array", from_column->get_name()));
+ }
+ return Status::OK();
+ };
+ }
+
WrapperType prepare_unpack_dictionaries(const DataTypePtr& from_type,
const DataTypePtr& to_type) const {
const auto& from_nested = from_type;
@@ -1069,7 +1166,6 @@ private:
WrapperType prepare_remove_nullable(const DataTypePtr& from_type, const DataTypePtr& to_type,
bool skip_not_null_check) const {
/// Determine whether pre-processing and/or post-processing must take place during conversion.
-
bool source_is_nullable = from_type->is_nullable();
bool result_is_nullable = to_type->is_nullable();
@@ -1096,7 +1192,8 @@ private:
tmp_block.insert({nullptr, nested_type, ""});
/// Perform the requested conversion.
- wrapper(context, tmp_block, arguments, tmp_res_index, input_rows_count);
+ RETURN_IF_ERROR(
+ wrapper(context, tmp_block, arguments, tmp_res_index, input_rows_count));
const auto& tmp_res = tmp_block.get_by_position(tmp_res_index);
@@ -1135,7 +1232,7 @@ private:
}
}
- wrapper(context, tmp_block, arguments, result, input_rows_count);
+ RETURN_IF_ERROR(wrapper(context, tmp_block, arguments, result, input_rows_count));
block.get_by_position(result).column = tmp_block.get_by_position(result).column;
return Status::OK();
};
@@ -1193,7 +1290,8 @@ private:
switch (to_type->get_type_id()) {
case TypeIndex::String:
return create_string_wrapper(from_type);
-
+ case TypeIndex::Array:
+ return create_array_wrapper(from_type, static_cast<const DataTypeArray&>(*to_type));
default:
break;
}
@@ -1238,8 +1336,8 @@ protected:
LOG(FATAL) << fmt::format(
"Second argument to {} must be a constant string describing type", get_name());
}
-
auto type = DataTypeFactory::instance().get(type_col->get_value<String>());
+ DCHECK(type != nullptr);
bool need_to_be_nullable = false;
// 1. from_type is nullable
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp b/be/src/vec/utils/arrow_column_to_doris_column.cpp
index a02fc92b48..206c279e4c 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.cpp
+++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp
@@ -28,6 +28,7 @@
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "gutil/casts.h"
+#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/runtime/vdatetime_value.h"
@@ -236,19 +237,52 @@ static Status convert_column_with_decimal_data(const arrow::Array* array, size_t
return Status::OK();
}
+static Status convert_offset_from_list_column(const arrow::Array* array, size_t array_idx,
+ MutableColumnPtr& data_column, size_t num_elements,
+ size_t* start_idx_for_data, size_t* num_for_data) {
+ auto& offsets_data = static_cast<ColumnArray&>(*data_column).get_offsets();
+ auto concrete_array = down_cast<const arrow::ListArray*>(array);
+ auto arrow_offsets_array = concrete_array->offsets();
+ auto arrow_offsets = down_cast<arrow::Int32Array*>(arrow_offsets_array.get());
+ for (int64_t i = array_idx + 1; i < array_idx + num_elements + 1; ++i) {
+ // convert to doris offset, start from 0
+ offsets_data.emplace_back(arrow_offsets->Value(i) - arrow_offsets->Value(array_idx));
+ }
+ *start_idx_for_data = arrow_offsets->Value(array_idx);
+ *num_for_data = offsets_data.back();
+
+ return Status::OK();
+}
+
+static Status convert_column_with_list_data(const arrow::Array* array, size_t array_idx,
+ MutableColumnPtr& data_column, size_t num_elements,
+ const std::string& timezone) {
+ size_t start_idx_of_data = 0;
+ size_t num_of_data = 0;
+ // get start idx and num of values from arrow offsets
+ RETURN_IF_ERROR(convert_offset_from_list_column(array, array_idx, data_column, num_elements,
+ &start_idx_of_data, &num_of_data));
+ auto& data_column_ptr = static_cast<ColumnArray&>(*data_column).get_data_ptr();
+ auto concrete_array = down_cast<const arrow::ListArray*>(array);
+ std::shared_ptr<arrow::Array> arrow_data = concrete_array->values();
+
+ return arrow_column_to_doris_column(arrow_data.get(), start_idx_of_data, data_column_ptr,
+ num_of_data, timezone);
+}
+
Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arrow_batch_cur_idx,
- ColumnWithTypeAndName& doirs_column, size_t num_elements,
+ ColumnPtr& doirs_column, size_t num_elements,
const std::string& timezone) {
// src column always be nullable for simpify converting
- assert(doirs_column.column->is_nullable());
+ assert(doirs_column->is_nullable());
MutableColumnPtr data_column = nullptr;
- if (doirs_column.column->is_nullable()) {
+ if (doirs_column->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(doirs_column.column)).mutate().get());
+ (*std::move(doirs_column)).mutate().get());
fill_nullable_column(arrow_column, arrow_batch_cur_idx, nullable_column, num_elements);
data_column = nullable_column->get_nested_column_ptr();
} else {
- data_column = (*std::move(doirs_column.column)).mutate();
+ data_column = (*std::move(doirs_column)).mutate();
}
// process data
switch (arrow_column->type()->id()) {
@@ -280,6 +314,9 @@ Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arr
case arrow::Type::DECIMAL:
return convert_column_with_decimal_data(arrow_column, arrow_batch_cur_idx, data_column,
num_elements);
+ case arrow::Type::LIST:
+ return convert_column_with_list_data(arrow_column, arrow_batch_cur_idx, data_column,
+ num_elements, timezone);
default:
break;
}
diff --git a/be/src/vec/utils/arrow_column_to_doris_column.h b/be/src/vec/utils/arrow_column_to_doris_column.h
index 2e70fee11a..0c75f27cda 100644
--- a/be/src/vec/utils/arrow_column_to_doris_column.h
+++ b/be/src/vec/utils/arrow_column_to_doris_column.h
@@ -34,7 +34,7 @@ namespace doris::vectorized {
const PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type);
Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arrow_batch_cur_idx,
- ColumnWithTypeAndName& doirs_column, size_t num_elements,
+ ColumnPtr& doirs_column, size_t num_elements,
const std::string& timezone);
} // namespace doris::vectorized
diff --git a/be/test/vec/utils/arrow_column_to_doris_column_test.cpp b/be/test/vec/utils/arrow_column_to_doris_column_test.cpp
index 0252cf4883..73181a57de 100644
--- a/be/test/vec/utils/arrow_column_to_doris_column_test.cpp
+++ b/be/test/vec/utils/arrow_column_to_doris_column_test.cpp
@@ -123,7 +123,7 @@ void test_arrow_to_datetime_column(std::shared_ptr<ArrowType> type, ColumnWithTy
if constexpr (std::is_same_v<ArrowType, arrow::TimestampType>) {
time_zone = type->timezone();
}
- auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, time_zone);
+ auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, time_zone);
ASSERT_EQ(ret.ok(), true);
ASSERT_EQ(column.column->size(), counter);
MutableColumnPtr data_column = nullptr;
@@ -216,7 +216,7 @@ void test_arrow_to_numeric_column(std::shared_ptr<ArrowType> type, ColumnWithTyp
ASSERT_EQ(column.column->size(), counter);
auto array = create_constant_numeric_array<ArrowType, is_nullable>(num_elements, arrow_numeric,
type, counter);
- auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC");
+ auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC");
ASSERT_EQ(ret.ok(), true);
ASSERT_EQ(column.column->size(), counter);
MutableColumnPtr data_column = nullptr;
@@ -351,7 +351,7 @@ void test_arrow_to_decimal_column(std::shared_ptr<arrow::Decimal128Type> type,
int128_t arrow_value, int128_t expect_value, size_t& counter) {
ASSERT_EQ(column.column->size(), counter);
auto array = create_decimal_array<is_nullable>(num_elements, arrow_value, type, counter);
- auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC");
+ auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC");
ASSERT_EQ(ret.ok(), true);
ASSERT_EQ(column.column->size(), counter);
MutableColumnPtr data_column = nullptr;
@@ -452,7 +452,7 @@ void test_arrow_to_fixed_binary_column(ColumnWithTypeAndName& column, size_t num
ASSERT_EQ(column.column->size(), counter);
auto array =
create_fixed_size_binary_array<bytes_width, is_nullable>(num_elements, value, counter);
- auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC");
+ auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC");
ASSERT_EQ(ret.ok(), true);
ASSERT_EQ(column.column->size(), counter);
MutableColumnPtr data_column = nullptr;
@@ -554,7 +554,7 @@ void test_arrow_to_binary_column(ColumnWithTypeAndName& column, size_t num_eleme
ArrowCppType value, size_t& counter) {
ASSERT_EQ(column.column->size(), counter);
auto array = create_binary_array<ArrowType, is_nullable>(num_elements, value, counter);
- auto ret = arrow_column_to_doris_column(array.get(), 0, column, num_elements, "UTC");
+ auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, num_elements, "UTC");
ASSERT_EQ(ret.ok(), true);
ASSERT_EQ(column.column->size(), counter);
MutableColumnPtr data_column = nullptr;
@@ -606,4 +606,119 @@ TEST(ArrowColumnToDorisColumnTest, test_binary) {
test_binary<arrow::BinaryType, false>(test_cases, 64);
test_binary<arrow::BinaryType, true>(test_cases, 64);
}
+
+template <typename ArrowValueType, bool is_nullable = false>
+static inline std::shared_ptr<arrow::Array> create_array_array(
+ std::vector<IColumn::Offset>& vec_offsets, std::vector<bool>& null_map,
+ std::shared_ptr<arrow::DataType> value_type, std::shared_ptr<arrow::Array> values,
+ size_t& counter) {
+ using offset_type = typename arrow::ListType::offset_type;
+ size_t num_rows = vec_offsets.size() - 1;
+ DCHECK(null_map.size() == num_rows);
+ size_t offsets_bytes = (vec_offsets.size()) * sizeof(offset_type);
+ auto offsets_buf_tmp = arrow::AllocateBuffer(offsets_bytes);
+ std::shared_ptr<arrow::Buffer> offsets_buf = std::move(offsets_buf_tmp.ValueOrDie());
+ auto* offsets = (offset_type*)offsets_buf->mutable_data();
+ offsets[0] = 0;
+
+ auto null_bitmap_bytes = ((num_rows) + 7) / 8;
+ auto null_bitmap_tmp = arrow::AllocateBuffer(null_bitmap_bytes);
+ std::shared_ptr<arrow::Buffer> null_bitmap = std::move(null_bitmap_tmp.ValueOrDie());
+ auto nulls = null_bitmap->mutable_data();
+ for (auto i = 0; i < num_rows; ++i) {
+ if (is_nullable && null_map[i]) {
+ arrow::bit_util::ClearBit(nulls, i);
+ } else {
+ arrow::bit_util::SetBit(nulls, i);
+ }
+ offsets[i + 1] = vec_offsets[i + 1];
+ }
+
+ auto array = std::make_shared<arrow::ListArray>(value_type, num_rows, offsets_buf, values,
+ null_bitmap);
+ counter += num_rows;
+ return std::static_pointer_cast<arrow::Array>(array);
+}
+
+template <typename ArrowType, bool is_nullable>
+void test_arrow_to_array_column(ColumnWithTypeAndName& column,
+ std::vector<IColumn::Offset>& vec_offsets,
+ std::vector<bool>& null_map,
+ std::shared_ptr<arrow::DataType> value_type,
+ std::shared_ptr<arrow::Array> values, const std::string& value,
+ size_t& counter) {
+ ASSERT_EQ(column.column->size(), counter);
+ auto array = create_array_array<ArrowType, is_nullable>(vec_offsets, null_map, value_type,
+ values, counter);
+ auto ret = arrow_column_to_doris_column(array.get(), 0, column.column, vec_offsets.size() - 1,
+ "UTC");
+ ASSERT_EQ(ret.ok(), true);
+ ASSERT_EQ(column.column->size(), counter);
+ MutableColumnPtr data_column = nullptr;
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ if (column.column->is_nullable()) {
+ nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(column.column)).mutate().get());
+ data_column = nullable_column->get_nested_column_ptr();
+ } else {
+ data_column = (*std::move(column.column)).mutate();
+ }
+ auto& array_column = static_cast<ColumnArray&>(*data_column);
+ EXPECT_EQ(array_column.size(), vec_offsets.size() - 1);
+ for (size_t i = 0; i < array_column.size(); ++i) {
+ auto v = get<Array>(array_column[i]);
+ EXPECT_EQ(v.size(), vec_offsets[i + 1] - vec_offsets[i]);
+ if (is_nullable) {
+ ASSERT_NE(nullable_column, nullptr);
+ NullMap& map_data = nullable_column->get_null_map_data();
+ ASSERT_EQ(map_data[i], null_map[i]);
+ if (!null_map[i]) {
+ // check value
+ for (size_t j = 0; j < v.size(); ++j) {
+ // in nested column, values like [null, xx, null, xx, ...]
+ if ((vec_offsets[i] + j) % 2 != 0) {
+ EXPECT_EQ(value, get<std::string>(v[j]));
+ }
+ }
+ }
+ } else {
+ // check value
+ for (size_t j = 0; j < v.size(); ++j) {
+ EXPECT_EQ(value, get<std::string>(v[j]));
+ }
+ }
+ }
+}
+
+template <typename ArrowType, bool is_nullable>
+void test_array(const std::vector<std::string>& test_cases, size_t num_elements,
+ std::vector<IColumn::Offset>& vec_offsets, std::vector<bool>& null_map,
+ std::shared_ptr<arrow::DataType> value_type) {
+ TypeDescriptor type(TYPE_ARRAY);
+ type.children.push_back(TYPE_VARCHAR);
+ DataTypePtr data_type = DataTypeFactory::instance().create_data_type(type, true);
+ for (auto& value : test_cases) {
+ MutableColumnPtr data_column = data_type->create_column();
+ ColumnWithTypeAndName column(std::move(data_column), data_type, "test_array_column");
+ // create nested column
+ size_t nested_counter = 0;
+ auto array =
+ create_binary_array<ArrowType, is_nullable>(num_elements, value, nested_counter);
+ ASSERT_EQ(nested_counter, num_elements);
+ size_t counter = 0;
+ test_arrow_to_array_column<ArrowType, is_nullable>(column, vec_offsets, null_map,
+ value_type, array, value, counter);
+ }
+}
+
+TEST(ArrowColumnToDorisColumnTest, test_array) {
+ std::vector<std::string> test_cases = {"1.2345678", "-12.34567890", "99999999999.99999999",
+ "-99999999999.99999999"};
+ std::vector<IColumn::Offset> vec_offsets = {0, 3, 3, 4, 6, 6, 64};
+ std::vector<bool> null_map = {false, true, false, false, false, false};
+ test_array<arrow::BinaryType, false>(test_cases, 64, vec_offsets, null_map,
+ arrow::list(arrow::binary()));
+ test_array<arrow::BinaryType, true>(test_cases, 64, vec_offsets, null_map,
+ arrow::list(arrow::binary()));
+}
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org