You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/26 13:39:17 UTC
[incubator-doris] branch master updated: [feature-wip](parquet-orc) Support orc scanner in vectorized engine (#9541)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new cbbda7857b [feature-wip](parquet-orc) Support orc scanner in vectorized engine (#9541)
cbbda7857b is described below
commit cbbda7857bbdb308d5d24f9af16aaba8dc8d9b47
Author: yinzhijian <37...@qq.com>
AuthorDate: Thu May 26 21:39:12 2022 +0800
[feature-wip](parquet-orc) Support orc scanner in vectorized engine (#9541)
---
be/src/exec/CMakeLists.txt | 4 +-
be/src/exec/arrow/arrow_reader.cpp | 156 ++++
be/src/exec/arrow/arrow_reader.h | 101 +++
be/src/exec/arrow/orc_reader.cpp | 115 +++
be/src/exec/arrow/orc_reader.h | 51 ++
be/src/exec/{ => arrow}/parquet_reader.cpp | 147 +---
be/src/exec/{ => arrow}/parquet_reader.h | 53 +-
be/src/exec/broker_scan_node.cpp | 13 +-
be/src/exec/parquet_scanner.cpp | 12 +-
be/src/http/action/stream_load.cpp | 2 +
be/src/vec/CMakeLists.txt | 2 +
be/src/vec/data_types/data_type_factory.cpp | 8 +-
be/src/vec/data_types/data_type_factory.hpp | 2 +-
.../{vparquet_scanner.cpp => varrow_scanner.cpp} | 143 +++-
.../exec/{vparquet_scanner.h => varrow_scanner.h} | 42 +-
be/src/vec/exec/vorc_scanner.cpp | 37 +
.../exec/{vparquet_scanner.h => vorc_scanner.h} | 37 +-
be/src/vec/exec/vparquet_scanner.cpp | 191 +----
be/src/vec/exec/vparquet_scanner.h | 24 +-
be/test/CMakeLists.txt | 2 +
be/test/olap/hll_test.cpp | 2 +-
be/test/vec/exec/vorc_scanner_test.cpp | 892 +++++++++++++++++++++
be/test/vec/exec/vparquet_scanner_test.cpp | 499 ++++++++++++
thirdparty/build-thirdparty.sh | 19 +-
24 files changed, 2098 insertions(+), 456 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 202ae767b0..d93c394047 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -22,6 +22,9 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/exec")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec")
set(EXEC_FILES
+ arrow/arrow_reader.cpp
+ arrow/orc_reader.cpp
+ arrow/parquet_reader.cpp
analytic_eval_node.cpp
blocking_join_node.cpp
broker_scan_node.cpp
@@ -94,7 +97,6 @@ set(EXEC_FILES
local_file_writer.cpp
broker_writer.cpp
parquet_scanner.cpp
- parquet_reader.cpp
parquet_writer.cpp
orc_scanner.cpp
odbc_connector.cpp
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
new file mode 100644
index 0000000000..789c66be2c
--- /dev/null
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "exec/arrow/arrow_reader.h"
+
+#include <arrow/array.h>
+#include <arrow/status.h>
+#include <time.h>
+
+#include "common/logging.h"
+#include "exec/file_reader.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/TPaloBrokerService.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_pool.h"
+#include "runtime/tuple.h"
+#include "util/thrift_util.h"
+
+namespace doris {
+
+// Broker
+
+ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file)
+ : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) {
+ _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
+ _rb_reader = nullptr;
+ _total_groups = 0;
+ _current_group = 0;
+}
+
+ArrowReaderWrap::~ArrowReaderWrap() {
+ close();
+}
+
+void ArrowReaderWrap::close() {
+ arrow::Status st = _arrow_file->Close();
+ if (!st.ok()) {
+ LOG(WARNING) << "close file error: " << st.ToString();
+ }
+}
+
+Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
+ DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size());
+ _include_column_ids.clear();
+ for (int i = 0; i < _num_of_columns_from_file; i++) {
+ auto slot_desc = tuple_slot_descs.at(i);
+ // Get the Column Reader for the boolean column
+ auto iter = _map_column.find(slot_desc->col_name());
+ if (iter != _map_column.end()) {
+ _include_column_ids.emplace_back(iter->second);
+ } else {
+ std::stringstream str_error;
+ str_error << "Invalid Column Name:" << slot_desc->col_name();
+ LOG(WARNING) << str_error.str();
+ return Status::InvalidArgument(str_error.str());
+ }
+ }
+ return Status::OK();
+}
+
+ArrowFile::ArrowFile(FileReader* file) : _file(file) {}
+
+ArrowFile::~ArrowFile() {
+ arrow::Status st = Close();
+ if (!st.ok()) {
+ LOG(WARNING) << "close file error: " << st.ToString();
+ }
+}
+
+arrow::Status ArrowFile::Close() {
+ if (_file != nullptr) {
+ _file->close();
+ delete _file;
+ _file = nullptr;
+ }
+ return arrow::Status::OK();
+}
+
+bool ArrowFile::closed() const {
+ if (_file != nullptr) {
+ return _file->closed();
+ } else {
+ return true;
+ }
+}
+
+arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes, void* buffer) {
+ return ReadAt(_pos, nbytes, buffer);
+}
+
+arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
+ int64_t reads = 0;
+ int64_t bytes_read = 0;
+ _pos = position;
+ while (nbytes > 0) {
+ Status result = _file->readat(_pos, nbytes, &reads, out);
+ if (!result.ok()) {
+ return arrow::Status::IOError("Readat failed.");
+ }
+ if (reads == 0) {
+ break;
+ }
+ bytes_read += reads; // total read bytes
+ nbytes -= reads; // remained bytes
+ _pos += reads;
+ out = (char*)out + reads;
+ }
+ return bytes_read;
+}
+
+arrow::Result<int64_t> ArrowFile::GetSize() {
+ return _file->size();
+}
+
+arrow::Status ArrowFile::Seek(int64_t position) {
+ _pos = position;
+ // NOTE: Only readat operation is used, so _file seek is not called here.
+ return arrow::Status::OK();
+}
+
+arrow::Result<int64_t> ArrowFile::Tell() const {
+ return _pos;
+}
+
+arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowFile::Read(int64_t nbytes) {
+ auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool());
+ ARROW_RETURN_NOT_OK(buffer);
+ std::shared_ptr<arrow::Buffer> read_buf = std::move(buffer.ValueOrDie());
+ auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data());
+ ARROW_RETURN_NOT_OK(bytes_read);
+ // If bytes_read is equal with read_buf's capacity, we just assign
+ if (bytes_read.ValueOrDie() == nbytes) {
+ return std::move(read_buf);
+ } else {
+ return arrow::SliceBuffer(read_buf, 0, bytes_read.ValueOrDie());
+ }
+}
+
+} // namespace doris
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
new file mode 100644
index 0000000000..4149f02493
--- /dev/null
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/api.h>
+#include <arrow/buffer.h>
+#include <arrow/io/api.h>
+#include <arrow/io/file.h>
+#include <arrow/io/interfaces.h>
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/exception.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+#include "common/status.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+
+namespace doris {
+
+class ExecEnv;
+class TBrokerRangeDesc;
+class TNetworkAddress;
+class RuntimeState;
+class Tuple;
+class SlotDescriptor;
+class MemPool;
+class FileReader;
+
+class ArrowFile : public arrow::io::RandomAccessFile {
+public:
+ ArrowFile(FileReader* file);
+ virtual ~ArrowFile();
+ arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
+ arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
+ arrow::Result<int64_t> GetSize() override;
+ arrow::Status Seek(int64_t position) override;
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
+ arrow::Result<int64_t> Tell() const override;
+ arrow::Status Close() override;
+ bool closed() const override;
+
+private:
+ FileReader* _file;
+ int64_t _pos = 0;
+};
+
+// base of arrow reader
+class ArrowReaderWrap {
+public:
+ ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
+ virtual ~ArrowReaderWrap();
+
+ virtual Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::string& timezone) = 0;
+ // for row
+ virtual Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ MemPool* mem_pool, bool* eof) {
+ return Status::NotSupported("Not Implemented read");
+ }
+ // for vec
+ virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) = 0;
+ virtual void close();
+ virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); }
+
+protected:
+ virtual Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
+
+protected:
+ const int64_t _batch_size;
+ const int32_t _num_of_columns_from_file;
+ std::shared_ptr<ArrowFile> _arrow_file;
+ std::shared_ptr<::arrow::RecordBatchReader> _rb_reader;
+ int _total_groups; // num of groups(stripes) of a parquet(orc) file
+ int _current_group; // current group(stripe)
+ std::map<std::string, int> _map_column; // column-name <---> column-index
+ std::vector<int> _include_column_ids; // columns that need to get from file
+};
+
+} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
new file mode 100644
index 0000000000..5815f008df
--- /dev/null
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "exec/arrow/orc_reader.h"
+
+#include <arrow/array.h>
+#include <arrow/status.h>
+#include <time.h>
+
+#include "common/logging.h"
+#include "exec/file_reader.h"
+#include "runtime/mem_pool.h"
+#include "runtime/tuple.h"
+
+namespace doris {
+
+ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file)
+ : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file) {
+ _reader = nullptr;
+ _cur_file_eof = false;
+}
+
+Status ORCReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::string& timezone) {
+ // Open ORC file reader
+ auto maybe_reader =
+ arrow::adapters::orc::ORCFileReader::Open(_arrow_file, arrow::default_memory_pool());
+ if (!maybe_reader.ok()) {
+ // Handle error instantiating file reader...
+ LOG(WARNING) << "failed to create orc file reader, errmsg=" << maybe_reader.status();
+ return Status::InternalError("Failed to create orc file reader");
+ }
+ _reader = std::move(maybe_reader.ValueOrDie());
+ _total_groups = _reader->NumberOfStripes();
+ if (_total_groups == 0) {
+ return Status::EndOfFile("Empty Orc File");
+ }
+
+ // map
+ arrow::Result<std::shared_ptr<arrow::Schema>> maybe_schema = _reader->ReadSchema();
+ if (!maybe_schema.ok()) {
+ // Handle error instantiating file reader...
+ LOG(WARNING) << "failed to read schema, errmsg=" << maybe_schema.status();
+ return Status::InternalError("Failed to create orc file reader");
+ }
+ std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
+ for (size_t i = 0; i < schema->num_fields(); ++i) {
+ _map_column.emplace(schema->field(i)->name(), i);
+ }
+
+ bool eof = false;
+ RETURN_IF_ERROR(_next_stripe_reader(&eof));
+ if (eof) {
+ return Status::EndOfFile("end of file");
+ }
+
+ RETURN_IF_ERROR(column_indices(tuple_slot_descs));
+ return Status::OK();
+}
+
+Status ORCReaderWrap::_next_stripe_reader(bool* eof) {
+ if (_current_group >= _total_groups) {
+ *eof = true;
+ return Status::OK();
+ }
+ // Get a stripe level record batch iterator.
+ // record batch will have up to batch_size rows.
+ // NextStripeReader serves as a fine grained alternative to ReadStripe
+ // which may cause OOM issues by loading the whole stripe into memory.
+ // Note this will only read rows for the current stripe, not the entire file.
+ arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> maybe_rb_reader =
+ _reader->NextStripeReader(_batch_size, _include_column_ids);
+ if (!maybe_rb_reader.ok()) {
+ LOG(WARNING) << "Get RecordBatch Failed. " << maybe_rb_reader.status();
+ return Status::InternalError(maybe_rb_reader.status().ToString());
+ }
+ _rb_reader = maybe_rb_reader.ValueOrDie();
+ _current_group++;
+ return Status::OK();
+}
+
+Status ORCReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
+ *eof = false;
+ do {
+ auto st = _rb_reader->ReadNext(batch);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to get next batch, errmsg=" << st;
+ return Status::InternalError(st.ToString());
+ }
+ if (*batch == nullptr) {
+ // try next stripe
+ RETURN_IF_ERROR(_next_stripe_reader(eof));
+ if (*eof) {
+ break;
+ }
+ }
+ } while (*batch == nullptr);
+ return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
new file mode 100644
index 0000000000..5213a18dcf
--- /dev/null
+++ b/be/src/exec/arrow/orc_reader.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/adapters/orc/adapter.h>
+#include <arrow/api.h>
+#include <arrow/buffer.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+#include "common/status.h"
+#include "exec/arrow/arrow_reader.h"
+namespace doris {
+
+// Reader of orc file
+class ORCReaderWrap final : public ArrowReaderWrap {
+public:
+ ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
+ ~ORCReaderWrap() override = default;
+
+ Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::string& timezone) override;
+ Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
+
+private:
+ Status _next_stripe_reader(bool* eof);
+
+private:
+ // orc file reader object
+ std::unique_ptr<arrow::adapters::orc::ORCFileReader> _reader;
+ bool _cur_file_eof; // is read over?
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
similarity index 84%
rename from be/src/exec/parquet_reader.cpp
rename to be/src/exec/arrow/parquet_reader.cpp
index 53880cdbb3..5c57efc4be 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#include "exec/parquet_reader.h"
+#include "exec/arrow/parquet_reader.h"
#include <arrow/array.h>
#include <arrow/status.h>
@@ -42,21 +42,15 @@ namespace doris {
// Broker
-ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file)
- : _num_of_columns_from_file(num_of_columns_from_file),
- _total_groups(0),
- _current_group(0),
+ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file)
+ : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
_rows_of_group(0),
_current_line_of_group(0),
- _current_line_of_batch(0) {
- _parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
-}
+ _current_line_of_batch(0) {}
-ParquetReaderWrap::~ParquetReaderWrap() {
- close();
-}
-Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
- const std::string& timezone) {
+Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::string& timezone) {
try {
parquet::ArrowReaderProperties arrow_reader_properties =
parquet::default_arrow_reader_properties();
@@ -66,7 +60,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
auto reader_builder = parquet::arrow::FileReaderBuilder();
reader_builder.properties(arrow_reader_properties);
- auto st = reader_builder.Open(_parquet);
+ auto st = reader_builder.Open(_arrow_file);
if (!st.ok()) {
LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
@@ -111,7 +105,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
_current_line_of_batch = 0;
//save column type
std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
- for (int i = 0; i < _parquet_column_ids.size(); i++) {
+ for (int i = 0; i < _include_column_ids.size(); i++) {
std::shared_ptr<arrow::Field> field = field_schema->field(i);
if (!field) {
LOG(WARNING) << "Get field schema failed. Column order:" << i;
@@ -131,14 +125,11 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
void ParquetReaderWrap::close() {
_closed = true;
_queue_writer_cond.notify_one();
- arrow::Status st = _parquet->Close();
- if (!st.ok()) {
- LOG(WARNING) << "close parquet file error: " << st.ToString();
- }
+ ArrowReaderWrap::close();
}
Status ParquetReaderWrap::size(int64_t* size) {
- arrow::Result<int64_t> result = _parquet->GetSize();
+ arrow::Result<int64_t> result = _arrow_file->GetSize();
if (result.ok()) {
*size = result.ValueOrDie();
return Status::OK();
@@ -158,24 +149,6 @@ inline void ParquetReaderWrap::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc
return;
}
-Status ParquetReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
- _parquet_column_ids.clear();
- for (int i = 0; i < _num_of_columns_from_file; i++) {
- auto slot_desc = tuple_slot_descs.at(i);
- // Get the Column Reader for the boolean column
- auto iter = _map_column.find(slot_desc->col_name());
- if (iter != _map_column.end()) {
- _parquet_column_ids.emplace_back(iter->second);
- } else {
- std::stringstream str_error;
- str_error << "Invalid Column Name:" << slot_desc->col_name();
- LOG(WARNING) << str_error.str();
- return Status::InvalidArgument(str_error.str());
- }
- }
- return Status::OK();
-}
-
inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc) {
if (!slot_desc->is_nullable()) {
std::stringstream str_error;
@@ -188,8 +161,7 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescript
return Status::OK();
}
-Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs,
- bool* eof) {
+Status ParquetReaderWrap::read_record_batch(bool* eof) {
if (_current_line_of_group >= _rows_of_group) { // read next row group
VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
<< " current line of group:" << _current_line_of_group
@@ -197,7 +169,7 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
<< ". start to read next row group";
_current_group++;
if (_current_group >= _total_groups) { // read completed.
- _parquet_column_ids.clear();
+ _include_column_ids.clear();
*eof = true;
return Status::OK();
}
@@ -219,11 +191,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
return Status::OK();
}
-Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
- const std::vector<SlotDescriptor*>& tuple_slot_descs,
- bool* eof) {
+Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) {
- RETURN_IF_ERROR(read_record_batch(tuple_slot_descs, eof));
+ RETURN_IF_ERROR(read_record_batch(eof));
}
*batch = get_batch();
return Status::OK();
@@ -281,7 +251,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
const uint8_t* value = nullptr;
int column_index = 0;
try {
- size_t slots = _parquet_column_ids.size();
+ size_t slots = _include_column_ids.size();
for (size_t i = 0; i < slots; ++i) {
auto slot_desc = tuple_slot_descs[i];
column_index = i; // column index in batch record
@@ -550,7 +520,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
// update data value
++_current_line_of_group;
++_current_line_of_batch;
- return read_record_batch(tuple_slot_descs, eof);
+ return read_record_batch(eof);
}
void ParquetReaderWrap::prefetch_batch() {
@@ -570,13 +540,13 @@ void ParquetReaderWrap::prefetch_batch() {
if (_closed || current_group >= _total_groups) {
return;
}
- _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch);
+ _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader);
if (!_status.ok()) {
_closed = true;
return;
}
arrow::RecordBatchVector batches;
- _status = _rb_batch->ReadAll(&batches);
+ _status = _rb_reader->ReadAll(&batches);
if (!_status.ok()) {
_closed = true;
return;
@@ -602,83 +572,4 @@ Status ParquetReaderWrap::read_next_batch() {
return Status::OK();
}
-ParquetFile::ParquetFile(FileReader* file) : _file(file) {}
-
-ParquetFile::~ParquetFile() {
- arrow::Status st = Close();
- if (!st.ok()) {
- LOG(WARNING) << "close parquet file error: " << st.ToString();
- }
-}
-
-arrow::Status ParquetFile::Close() {
- if (_file != nullptr) {
- _file->close();
- delete _file;
- _file = nullptr;
- }
- return arrow::Status::OK();
-}
-
-bool ParquetFile::closed() const {
- if (_file != nullptr) {
- return _file->closed();
- } else {
- return true;
- }
-}
-
-arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* buffer) {
- return ReadAt(_pos, nbytes, buffer);
-}
-
-arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
- int64_t reads = 0;
- int64_t bytes_read = 0;
- _pos = position;
- while (nbytes > 0) {
- Status result = _file->readat(_pos, nbytes, &reads, out);
- if (!result.ok()) {
- bytes_read = 0;
- return arrow::Status::IOError("Readat failed.");
- }
- if (reads == 0) {
- break;
- }
- bytes_read += reads; // total read bytes
- nbytes -= reads; // remained bytes
- _pos += reads;
- out = (char*)out + reads;
- }
- return bytes_read;
-}
-
-arrow::Result<int64_t> ParquetFile::GetSize() {
- return _file->size();
-}
-
-arrow::Status ParquetFile::Seek(int64_t position) {
- _pos = position;
- // NOTE: Only readat operation is used, so _file seek is not called here.
- return arrow::Status::OK();
-}
-
-arrow::Result<int64_t> ParquetFile::Tell() const {
- return _pos;
-}
-
-arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::Read(int64_t nbytes) {
- auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool());
- ARROW_RETURN_NOT_OK(buffer);
- std::shared_ptr<arrow::Buffer> read_buf = std::move(buffer.ValueOrDie());
- auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data());
- ARROW_RETURN_NOT_OK(bytes_read);
- // If bytes_read is equal with read_buf's capacity, we just assign
- if (bytes_read.ValueOrDie() == nbytes) {
- return std::move(read_buf);
- } else {
- return arrow::SliceBuffer(read_buf, 0, bytes_read.ValueOrDie());
- }
-}
-
} // namespace doris
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
similarity index 64%
rename from be/src/exec/parquet_reader.h
rename to be/src/exec/arrow/parquet_reader.h
index 77b946b691..597dea2feb 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -40,6 +40,7 @@
#include "common/config.h"
#include "common/status.h"
+#include "exec/arrow/arrow_reader.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
@@ -55,46 +56,28 @@ class SlotDescriptor;
class MemPool;
class FileReader;
-class ParquetFile : public arrow::io::RandomAccessFile {
+// Reader of parquet file
+class ParquetReaderWrap final : public ArrowReaderWrap {
public:
- ParquetFile(FileReader* file);
- ~ParquetFile() override;
- arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
- arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
- arrow::Result<int64_t> GetSize() override;
- arrow::Status Seek(int64_t position) override;
- arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
- arrow::Result<int64_t> Tell() const override;
- arrow::Status Close() override;
- bool closed() const override;
-
-private:
- FileReader* _file;
- int64_t _pos = 0;
-};
-
-// Reader of broker parquet file
-class ParquetReaderWrap {
-public:
- ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file);
- virtual ~ParquetReaderWrap();
+ // batch_size is not use here
+ ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file);
+ ~ParquetReaderWrap() override = default;
// Read
Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
- MemPool* mem_pool, bool* eof);
- void close();
- Status size(int64_t* size);
- Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
- const std::string& timezone);
- Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
- const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);
+ MemPool* mem_pool, bool* eof) override;
+ Status size(int64_t* size) override;
+ Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::string& timezone) override;
+ Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
+ void close() override;
private:
void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value,
int32_t len);
- Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc);
- Status read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);
+ Status read_record_batch(bool* eof);
const std::shared_ptr<arrow::RecordBatch>& get_batch();
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
int32_t* wbtyes);
@@ -104,19 +87,11 @@ private:
Status read_next_batch();
private:
- const int32_t _num_of_columns_from_file;
- std::shared_ptr<ParquetFile> _parquet;
-
// parquet file reader object
- std::unique_ptr<::arrow::RecordBatchReader> _rb_batch;
std::shared_ptr<arrow::RecordBatch> _batch;
std::unique_ptr<parquet::arrow::FileReader> _reader;
std::shared_ptr<parquet::FileMetaData> _file_metadata;
- std::map<std::string, int> _map_column; // column-name <---> column-index
- std::vector<int> _parquet_column_ids;
std::vector<arrow::Type::type> _parquet_column_type;
- int _total_groups; // groups in a parquet file
- int _current_group;
int _rows_of_group; // rows in a group.
int _current_line_of_group;
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index f2f3742cf8..a378c41203 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -33,6 +33,7 @@
#include "util/thread.h"
#include "vec/exec/vbroker_scanner.h"
#include "vec/exec/vjson_scanner.h"
+#include "vec/exec/vorc_scanner.h"
#include "vec/exec/vparquet_scanner.h"
namespace doris {
@@ -237,9 +238,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
}
break;
case TFileFormatType::FORMAT_ORC:
- scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
- scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs,
- counter);
+ if (_vectorized) {
+ scan = new vectorized::VORCScanner(_runtime_state, runtime_profile(), scan_range.params,
+ scan_range.ranges, scan_range.broker_addresses,
+ _pre_filter_texprs, counter);
+ } else {
+ scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
+ scan_range.ranges, scan_range.broker_addresses,
+ _pre_filter_texprs, counter);
+ }
break;
case TFileFormatType::FORMAT_JSON:
if (_vectorized) {
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index c6cb02e8c2..880313b6f0 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -17,12 +17,12 @@
#include "exec/parquet_scanner.h"
+#include "exec/arrow/parquet_reader.h"
#include "exec/broker_reader.h"
#include "exec/buffered_reader.h"
#include "exec/decompressor.h"
#include "exec/hdfs_reader_writer.h"
#include "exec/local_file_reader.h"
-#include "exec/parquet_reader.h"
#include "exec/s3_reader.h"
#include "exec/text_converter.h"
#include "runtime/exec_env.h"
@@ -141,14 +141,14 @@ Status ParquetScanner::open_next_reader() {
file_reader->close();
continue;
}
+ int32_t num_of_columns_from_file = _src_slot_descs.size();
if (range.__isset.num_of_columns_from_file) {
- _cur_file_reader =
- new ParquetReaderWrap(file_reader.release(), range.num_of_columns_from_file);
- } else {
- _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _src_slot_descs.size());
+ num_of_columns_from_file = range.num_of_columns_from_file;
}
+ _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(),
+ num_of_columns_from_file);
- Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs, _state->timezone());
+ Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
if (status.is_end_of_file()) {
continue;
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index b9087990ca..28bfafedf6 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -101,6 +101,8 @@ static TFileFormatType::type parse_format(const std::string& format_str,
}
} else if (iequal(format_str, "PARQUET")) {
format_type = TFileFormatType::FORMAT_PARQUET;
+ } else if (iequal(format_str, "ORC")) {
+ format_type = TFileFormatType::FORMAT_ORC;
}
return format_type;
}
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 265d6fd884..49f92ddf79 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -77,6 +77,7 @@ set(VEC_FILES
data_types/data_type_date.cpp
data_types/data_type_date_time.cpp
exec/vaggregation_node.cpp
+ exec/varrow_scanner.cpp
exec/ves_http_scan_node.cpp
exec/ves_http_scanner.cpp
exec/volap_scan_node.cpp
@@ -103,6 +104,7 @@ set(VEC_FILES
exec/vbroker_scanner.cpp
exec/vjson_scanner.cpp
exec/vparquet_scanner.cpp
+ exec/vorc_scanner.cpp
exec/join/vhash_join_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp
index 557b978c8e..2071b8cedb 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -260,9 +260,9 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
return nested;
}
-DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, bool is_nullable) {
+DataTypePtr DataTypeFactory::create_data_type(const arrow::DataType* type, bool is_nullable) {
DataTypePtr nested = nullptr;
- switch (type) {
+ switch (type->id()) {
case ::arrow::Type::BOOL:
nested = std::make_shared<vectorized::DataTypeUInt8>();
break;
@@ -310,10 +310,10 @@ DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, boo
nested = std::make_shared<vectorized::DataTypeString>();
break;
case ::arrow::Type::DECIMAL:
- nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
+ nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>();
break;
default:
- DCHECK(false) << "invalid arrow type:" << (int)type;
+ DCHECK(false) << "invalid arrow type:" << (int)(type->id());
break;
}
diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp
index 3b667c6f72..968617414c 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -88,7 +88,7 @@ public:
DataTypePtr create_data_type(const PColumnMeta& pcolumn);
- DataTypePtr create_data_type(const arrow::Type::type& type, bool is_nullable);
+ DataTypePtr create_data_type(const arrow::DataType* type, bool is_nullable);
private:
DataTypePtr _create_primitive_data_type(const FieldType& type) const;
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
similarity index 61%
copy from be/src/vec/exec/vparquet_scanner.cpp
copy to be/src/vec/exec/varrow_scanner.cpp
index 037bc15028..b44475ce19 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -15,33 +15,118 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/exec/vparquet_scanner.h"
-
-#include "exec/parquet_reader.h"
+#include "exec/arrow/parquet_reader.h"
+#include "exec/broker_reader.h"
+#include "exec/buffered_reader.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_reader.h"
+#include "exec/s3_reader.h"
#include "exprs/expr.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/vorc_scanner.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/utils/arrow_column_to_doris_column.h"
namespace doris::vectorized {
-VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
- const TBrokerScanRangeParams& params,
- const std::vector<TBrokerRangeDesc>& ranges,
- const std::vector<TNetworkAddress>& broker_addresses,
- const std::vector<TExpr>& pre_filter_texprs,
- ScannerCounter* counter)
- : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
- counter),
+VArrowScanner::VArrowScanner(RuntimeState* state, RuntimeProfile* profile,
+ const TBrokerScanRangeParams& params,
+ const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+ : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+ // _splittable(params.splittable),
+ _cur_file_reader(nullptr),
+ _cur_file_eof(false),
_batch(nullptr),
_arrow_batch_cur_idx(0) {}
-VParquetScanner::~VParquetScanner() = default;
+VArrowScanner::~VArrowScanner() {
+ close();
+}
+
+Status VArrowScanner::_open_next_reader() {
+ // open_file_reader
+ if (_cur_file_reader != nullptr) {
+ delete _cur_file_reader;
+ _cur_file_reader = nullptr;
+ }
+
+ while (true) {
+ if (_next_range >= _ranges.size()) {
+ _scanner_eof = true;
+ return Status::OK();
+ }
+ const TBrokerRangeDesc& range = _ranges[_next_range++];
+ std::unique_ptr<FileReader> file_reader;
+ switch (range.file_type) {
+ case TFileType::FILE_LOCAL: {
+ file_reader.reset(new LocalFileReader(range.path, range.start_offset));
+ break;
+ }
+ case TFileType::FILE_HDFS: {
+ FileReader* reader;
+ RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+ range.start_offset, &reader));
+ file_reader.reset(reader);
+ break;
+ }
+ case TFileType::FILE_BROKER: {
+ int64_t file_size = 0;
+ // for compatibility
+ if (range.__isset.file_size) {
+ file_size = range.file_size;
+ }
+ file_reader.reset(new BufferedReader(
+ _profile,
+ new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
+ range.path, range.start_offset, file_size)));
+ break;
+ }
+ case TFileType::FILE_S3: {
+ file_reader.reset(new BufferedReader(
+ _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
+ break;
+ }
+ default: {
+ std::stringstream ss;
+ ss << "Unknown file type, type=" << range.file_type;
+ return Status::InternalError(ss.str());
+ }
+ }
+ RETURN_IF_ERROR(file_reader->open());
+ if (file_reader->size() == 0) {
+ file_reader->close();
+ continue;
+ }
+
+ int32_t num_of_columns_from_file = _src_slot_descs.size();
+ if (range.__isset.num_of_columns_from_file) {
+ num_of_columns_from_file = range.num_of_columns_from_file;
+ }
+ _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
+ num_of_columns_from_file);
+
+ Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
+
+ if (status.is_end_of_file()) {
+ continue;
+ } else {
+ if (!status.ok()) {
+ std::stringstream ss;
+ ss << " file: " << range.path << " error:" << status.get_error_msg();
+ return Status::InternalError(ss.str());
+ } else {
+ return status;
+ }
+ }
+ }
+}
-Status VParquetScanner::open() {
- RETURN_IF_ERROR(ParquetScanner::open());
+Status VArrowScanner::open() {
+ RETURN_IF_ERROR(BaseScanner::open());
if (_ranges.empty()) {
return Status::OK();
}
@@ -49,18 +134,18 @@ Status VParquetScanner::open() {
}
// get next available arrow batch
-Status VParquetScanner::_next_arrow_batch() {
+Status VArrowScanner::_next_arrow_batch() {
_arrow_batch_cur_idx = 0;
// first, init file reader
if (_cur_file_reader == nullptr || _cur_file_eof) {
- RETURN_IF_ERROR(open_next_reader());
+ RETURN_IF_ERROR(_open_next_reader());
_cur_file_eof = false;
}
// second, loop until find available arrow batch or EOF
while (!_scanner_eof) {
- RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+ RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
if (_cur_file_eof) {
- RETURN_IF_ERROR(open_next_reader());
+ RETURN_IF_ERROR(_open_next_reader());
_cur_file_eof = false;
continue;
}
@@ -72,7 +157,7 @@ Status VParquetScanner::_next_arrow_batch() {
return Status::EndOfFile("EOF");
}
-Status VParquetScanner::_init_arrow_batch_if_necessary() {
+Status VArrowScanner::_init_arrow_batch_if_necessary() {
// 1. init batch if first time
// 2. reset reader if end of file
Status status;
@@ -85,7 +170,7 @@ Status VParquetScanner::_init_arrow_batch_if_necessary() {
return status;
}
-Status VParquetScanner::_init_src_block() {
+Status VArrowScanner::_init_src_block() {
size_t batch_pos = 0;
_src_block.clear();
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
@@ -98,7 +183,7 @@ Status VParquetScanner::_init_src_block() {
// TODO, support not nullable for exec efficiently
auto is_nullable = true;
DataTypePtr data_type =
- DataTypeFactory::instance().create_data_type(array->type()->id(), is_nullable);
+ DataTypeFactory::instance().create_data_type(array->type().get(), is_nullable);
if (data_type == nullptr) {
return Status::NotSupported(
fmt::format("Not support arrow type:{}", array->type()->name()));
@@ -110,7 +195,7 @@ Status VParquetScanner::_init_src_block() {
return Status::OK();
}
-Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
+Status VArrowScanner::get_next(vectorized::Block* block, bool* eof) {
// overall of type converting:
// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
// primitive type(PT1) ==materialize_block==> dest primitive type
@@ -171,7 +256,7 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
// primitive type(PT1) ==materialize_block==> dest primitive type
-Status VParquetScanner::_cast_src_block(Block* block) {
+Status VArrowScanner::_cast_src_block(Block* block) {
// cast primitive type(PT0) to primitive type(PT1)
for (size_t i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descs[i];
@@ -194,7 +279,7 @@ Status VParquetScanner::_cast_src_block(Block* block) {
return Status::OK();
}
-Status VParquetScanner::_append_batch_to_src_block(Block* block) {
+Status VArrowScanner::_append_batch_to_src_block(Block* block) {
size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
(_batch->num_rows() - _arrow_batch_cur_idx));
size_t column_pos = 0;
@@ -214,4 +299,12 @@ Status VParquetScanner::_append_batch_to_src_block(Block* block) {
return Status::OK();
}
-} // namespace doris::vectorized
+void VArrowScanner::close() {
+ BaseScanner::close();
+ if (_cur_file_reader != nullptr) {
+ delete _cur_file_reader;
+ _cur_file_reader = nullptr;
+ }
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/varrow_scanner.h
similarity index 53%
copy from be/src/vec/exec/vparquet_scanner.h
copy to be/src/vec/exec/varrow_scanner.h
index 72ac280989..c3740e0d3e 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -18,7 +18,8 @@
#pragma once
#include <arrow/array.h>
-#include <exec/parquet_scanner.h>
+#include <exec/arrow/arrow_reader.h>
+#include <exec/arrow/orc_reader.h>
#include <map>
#include <memory>
@@ -28,37 +29,52 @@
#include <vector>
#include "common/status.h"
-#include "gen_cpp/PlanNodes_types.h"
+#include "exec/base_scanner.h"
#include "gen_cpp/Types_types.h"
#include "runtime/mem_pool.h"
#include "util/runtime_profile.h"
namespace doris::vectorized {
-// VParquet scanner convert the data read from Parquet to doris's columns.
-class VParquetScanner : public ParquetScanner {
+// VArrow scanner convert the data read from orc|parquet to doris's columns.
+class VArrowScanner : public BaseScanner {
public:
- VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
- const TBrokerScanRangeParams& params,
- const std::vector<TBrokerRangeDesc>& ranges,
- const std::vector<TNetworkAddress>& broker_addresses,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+ VArrowScanner(RuntimeState* state, RuntimeProfile* profile,
+ const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
- ~VParquetScanner() override;
+ virtual ~VArrowScanner();
// Open this scanner, will initialize information need to
- Status open() override;
+ virtual Status open() override;
- Status get_next(Block* block, bool* eof) override;
+ virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof,
+ bool* fill_tuple) override {
+ return Status::NotSupported("Not Implemented get next");
+ }
+
+ virtual Status get_next(Block* block, bool* eof) override;
+
+ virtual void close() override;
+
+protected:
+ virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file) = 0;
private:
+ // Read next buffer from reader
+ Status _open_next_reader();
Status _next_arrow_batch();
Status _init_arrow_batch_if_necessary();
- Status _init_src_block() override;
+ Status _init_src_block();
Status _append_batch_to_src_block(Block* block);
Status _cast_src_block(Block* block);
private:
+ // Reader
+ ArrowReaderWrap* _cur_file_reader;
+ bool _cur_file_eof; // is read over?
std::shared_ptr<arrow::RecordBatch> _batch;
size_t _arrow_batch_cur_idx;
};
diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp
new file mode 100644
index 0000000000..7521634183
--- /dev/null
+++ b/be/src/vec/exec/vorc_scanner.cpp
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vorc_scanner.h"
+
+#include <exec/arrow/orc_reader.h>
+
+namespace doris::vectorized {
+
+VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile,
+ const TBrokerScanRangeParams& params,
+ const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+ : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+ counter) {}
+
+ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file) {
+ return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vorc_scanner.h
similarity index 52%
copy from be/src/vec/exec/vparquet_scanner.h
copy to be/src/vec/exec/vorc_scanner.h
index 72ac280989..12510e9731 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vorc_scanner.h
@@ -18,7 +18,7 @@
#pragma once
#include <arrow/array.h>
-#include <exec/parquet_scanner.h>
+#include <vec/exec/varrow_scanner.h>
#include <map>
#include <memory>
@@ -28,39 +28,26 @@
#include <vector>
#include "common/status.h"
-#include "gen_cpp/PlanNodes_types.h"
+#include "exec/base_scanner.h"
#include "gen_cpp/Types_types.h"
#include "runtime/mem_pool.h"
#include "util/runtime_profile.h"
namespace doris::vectorized {
-// VParquet scanner convert the data read from Parquet to doris's columns.
-class VParquetScanner : public ParquetScanner {
+// VOrc scanner convert the data read from Orc to doris's columns.
+class VORCScanner final : public VArrowScanner {
public:
- VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
- const TBrokerScanRangeParams& params,
- const std::vector<TBrokerRangeDesc>& ranges,
- const std::vector<TNetworkAddress>& broker_addresses,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+ VORCScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
+ const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
- ~VParquetScanner() override;
+ ~VORCScanner() override = default;
- // Open this scanner, will initialize information need to
- Status open() override;
-
- Status get_next(Block* block, bool* eof) override;
-
-private:
- Status _next_arrow_batch();
- Status _init_arrow_batch_if_necessary();
- Status _init_src_block() override;
- Status _append_batch_to_src_block(Block* block);
- Status _cast_src_block(Block* block);
-
-private:
- std::shared_ptr<arrow::RecordBatch> _batch;
- size_t _arrow_batch_cur_idx;
+protected:
+ ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file) override;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index 037bc15028..cb59ae60bc 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -17,13 +17,7 @@
#include "vec/exec/vparquet_scanner.h"
-#include "exec/parquet_reader.h"
-#include "exprs/expr.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "vec/data_types/data_type_factory.hpp"
-#include "vec/functions/simple_function_factory.h"
-#include "vec/utils/arrow_column_to_doris_column.h"
+#include "exec/arrow/parquet_reader.h"
namespace doris::vectorized {
@@ -33,185 +27,12 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter)
- : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
- counter),
- _batch(nullptr),
- _arrow_batch_cur_idx(0) {}
+ : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+ counter) {}
-VParquetScanner::~VParquetScanner() = default;
-
-Status VParquetScanner::open() {
- RETURN_IF_ERROR(ParquetScanner::open());
- if (_ranges.empty()) {
- return Status::OK();
- }
- return Status::OK();
-}
-
-// get next available arrow batch
-Status VParquetScanner::_next_arrow_batch() {
- _arrow_batch_cur_idx = 0;
- // first, init file reader
- if (_cur_file_reader == nullptr || _cur_file_eof) {
- RETURN_IF_ERROR(open_next_reader());
- _cur_file_eof = false;
- }
- // second, loop until find available arrow batch or EOF
- while (!_scanner_eof) {
- RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
- if (_cur_file_eof) {
- RETURN_IF_ERROR(open_next_reader());
- _cur_file_eof = false;
- continue;
- }
- if (_batch->num_rows() == 0) {
- continue;
- }
- return Status::OK();
- }
- return Status::EndOfFile("EOF");
-}
-
-Status VParquetScanner::_init_arrow_batch_if_necessary() {
- // 1. init batch if first time
- // 2. reset reader if end of file
- Status status;
- if (_scanner_eof) {
- return Status::EndOfFile("EOF");
- }
- if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
- return _next_arrow_batch();
- }
- return status;
-}
-
-Status VParquetScanner::_init_src_block() {
- size_t batch_pos = 0;
- _src_block.clear();
- for (auto i = 0; i < _num_of_columns_from_file; ++i) {
- SlotDescriptor* slot_desc = _src_slot_descs[i];
- if (slot_desc == nullptr) {
- continue;
- }
- auto* array = _batch->column(batch_pos++).get();
- // let src column be nullable for simplify converting
- // TODO, support not nullable for exec efficiently
- auto is_nullable = true;
- DataTypePtr data_type =
- DataTypeFactory::instance().create_data_type(array->type()->id(), is_nullable);
- if (data_type == nullptr) {
- return Status::NotSupported(
- fmt::format("Not support arrow type:{}", array->type()->name()));
- }
- MutableColumnPtr data_column = data_type->create_column();
- _src_block.insert(
- ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
- }
- return Status::OK();
-}
-
-Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
- // overall of type converting:
- // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
- // primitive type(PT1) ==materialize_block==> dest primitive type
-
- // first, we need to convert the arrow type to the corresponding internal type,
- // such as arrow::INT16 to TYPE_SMALLINT(PT0).
- // why need first step? we cannot convert the arrow type to type in src desc directly,
- // it's too hard to achieve.
-
- // second, convert PT0 to the type in src desc, such as TYPE_SMALLINT to TYPE_VARCHAR.(PT1)
- // why need second step? the materialize step only accepts types specified in src desc.
-
- // finally, through the materialized, convert to the type in dest desc, such as TYPE_DATETIME.
- SCOPED_TIMER(_read_timer);
- // init arrow batch
- {
- Status st = _init_arrow_batch_if_necessary();
- if (!st.ok()) {
- if (!st.is_end_of_file()) {
- return st;
- }
- *eof = true;
- return Status::OK();
- }
- }
-
- RETURN_IF_ERROR(_init_src_block());
- // convert arrow batch to block until reach the batch_size
- while (!_scanner_eof) {
- // cast arrow type to PT0 and append it to src block
- // for example: arrow::Type::INT16 => TYPE_SMALLINT
- RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block));
- // finalize the src block if full
- if (_src_block.rows() >= _state->batch_size()) {
- break;
- }
- auto status = _next_arrow_batch();
- // if ok, append the batch to the src columns
- if (status.ok()) {
- continue;
- }
- // return error if not EOF
- if (!status.is_end_of_file()) {
- return status;
- }
- _cur_file_eof = true;
- break;
- }
- COUNTER_UPDATE(_rows_read_counter, _src_block.rows());
- SCOPED_TIMER(_materialize_timer);
- // cast PT0 => PT1
- // for example: TYPE_SMALLINT => TYPE_VARCHAR
- RETURN_IF_ERROR(_cast_src_block(&_src_block));
-
- // materialize, src block => dest columns
- return _fill_dest_block(block, eof);
-}
-
-// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
-// primitive type(PT1) ==materialize_block==> dest primitive type
-Status VParquetScanner::_cast_src_block(Block* block) {
- // cast primitive type(PT0) to primitive type(PT1)
- for (size_t i = 0; i < _num_of_columns_from_file; ++i) {
- SlotDescriptor* slot_desc = _src_slot_descs[i];
- if (slot_desc == nullptr) {
- continue;
- }
- auto& arg = block->get_by_name(slot_desc->col_name());
- // remove nullable here, let the get_function decide whether nullable
- auto return_type = slot_desc->get_data_type_ptr();
- ColumnsWithTypeAndName arguments {
- arg,
- {DataTypeString().create_column_const(
- arg.column->size(), remove_nullable(return_type)->get_family_name()),
- std::make_shared<DataTypeString>(), ""}};
- auto func_cast =
- SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
- RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size()));
- block->get_by_position(i).type = std::move(return_type);
- }
- return Status::OK();
-}
-
-Status VParquetScanner::_append_batch_to_src_block(Block* block) {
- size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
- (_batch->num_rows() - _arrow_batch_cur_idx));
- size_t column_pos = 0;
- for (auto i = 0; i < _num_of_columns_from_file; ++i) {
- SlotDescriptor* slot_desc = _src_slot_descs[i];
- if (slot_desc == nullptr) {
- continue;
- }
- auto* array = _batch->column(column_pos++).get();
- auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
- RETURN_IF_ERROR(arrow_column_to_doris_column(array, _arrow_batch_cur_idx,
- column_with_type_and_name, num_elements,
- _state->timezone()));
- }
-
- _arrow_batch_cur_idx += num_elements;
- return Status::OK();
+ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file) {
+ return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h
index 72ac280989..367e2e7472 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -18,7 +18,7 @@
#pragma once
#include <arrow/array.h>
-#include <exec/parquet_scanner.h>
+#include <vec/exec/varrow_scanner.h>
#include <map>
#include <memory>
@@ -36,7 +36,7 @@
namespace doris::vectorized {
// VParquet scanner convert the data read from Parquet to doris's columns.
-class VParquetScanner : public ParquetScanner {
+class VParquetScanner final : public VArrowScanner {
public:
VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
@@ -44,23 +44,11 @@ public:
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
- ~VParquetScanner() override;
+ ~VParquetScanner() override = default;
- // Open this scanner, will initialize information need to
- Status open() override;
-
- Status get_next(Block* block, bool* eof) override;
-
-private:
- Status _next_arrow_batch();
- Status _init_arrow_batch_if_necessary();
- Status _init_src_block() override;
- Status _append_batch_to_src_block(Block* block);
- Status _cast_src_block(Block* block);
-
-private:
- std::shared_ptr<arrow::RecordBatch> _batch;
- size_t _arrow_batch_cur_idx;
+protected:
+ ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+ int32_t num_of_columns_from_file) override;
};
} // namespace doris::vectorized
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 20052e823e..48d37e56a1 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -338,6 +338,8 @@ set(VEC_TEST_FILES
vec/exec/vbroker_scanner_test.cpp
vec/exec/vjson_scanner_test.cpp
vec/exec/vtablet_sink_test.cpp
+ vec/exec/vorc_scanner_test.cpp
+ vec/exec/vparquet_scanner_test.cpp
vec/exprs/vexpr_test.cpp
vec/function/function_array_element_test.cpp
vec/function/function_array_index_test.cpp
diff --git a/be/test/olap/hll_test.cpp b/be/test/olap/hll_test.cpp
index 3131efdeb2..5843a076cb 100644
--- a/be/test/olap/hll_test.cpp
+++ b/be/test/olap/hll_test.cpp
@@ -34,7 +34,7 @@ static uint64_t hash(uint64_t value) {
}
// keep logic same with java version in fe when you change hll_test.cpp,see HllTest.java
TEST_F(TestHll, Normal) {
- uint8_t buf[HLL_REGISTERS_COUNT + 1];
+ uint8_t buf[HLL_REGISTERS_COUNT + 1] = {0};
// empty
{
diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp
new file mode 100644
index 0000000000..f5f8bf522c
--- /dev/null
+++ b/be/test/vec/exec/vorc_scanner_test.cpp
@@ -0,0 +1,892 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vorc_scanner.h"
+
+#include <gtest/gtest.h>
+#include <runtime/descriptor_helper.h>
+#include <time.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/local_file_reader.h"
+#include "exec/orc_scanner.h"
+#include "exprs/cast_functions.h"
+#include "exprs/decimalv2_operators.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+namespace vectorized {
+
+class VOrcScannerTest : public testing::Test {
+public:
+ VOrcScannerTest() : _runtime_state(TQueryGlobals()) {
+ _profile = _runtime_state.runtime_profile();
+ _runtime_state._instance_mem_tracker.reset(new MemTracker());
+ _runtime_state._query_options.enable_vectorized_engine = true;
+ }
+ ~VOrcScannerTest() {}
+
+ static void SetUpTestCase() {
+ UserFunctionCache::instance()->init(
+ "./be/test/runtime/test_data/user_function_cache/normal");
+ CastFunctions::init();
+ DecimalV2Operators::init();
+ }
+
+protected:
+ virtual void SetUp() {}
+
+ virtual void TearDown() {}
+
+private:
+ RuntimeState _runtime_state;
+ RuntimeProfile* _profile;
+ ObjectPool _obj_pool;
+ DescriptorTbl* _desc_tbl;
+ std::vector<TNetworkAddress> _addresses;
+ ScannerCounter _counter;
+ std::vector<TExpr> _pre_filter;
+ bool _fill_tuple;
+};
+
+TEST_F(VOrcScannerTest, normal) {
+ TBrokerScanRangeParams params;
+ TTypeDesc varchar_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ varchar_type.types.push_back(node);
+ }
+
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::INT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+
+ TTypeDesc big_int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BIGINT);
+ node.__set_scalar_type(scalar_type);
+ big_int_type.types.push_back(node);
+ }
+
+ TTypeDesc float_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::FLOAT);
+ node.__set_scalar_type(scalar_type);
+ float_type.types.push_back(node);
+ }
+
+ TTypeDesc double_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::DOUBLE);
+ node.__set_scalar_type(scalar_type);
+ double_type.types.push_back(node);
+ }
+
+ TTypeDesc date_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::DATE);
+ node.__set_scalar_type(scalar_type);
+ date_type.types.push_back(node);
+ }
+
+ //col1 varchar -> bigint
+ {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = big_int_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttobigint";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = big_int_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 0;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(8, expr);
+ params.src_slot_ids.push_back(0);
+ }
+ //col2, col3
+ for (int i = 1; i <= 2; i++) {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = i;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(8 + i, expr);
+ params.src_slot_ids.push_back(i);
+ }
+
+ //col5 varchar -> double
+ {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = double_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttodouble";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = double_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_double_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 3;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(11, expr);
+ params.src_slot_ids.push_back(3);
+ }
+
+ //col6 varchar -> float
+ {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = float_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttofloat";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = float_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_float_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 4;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(12, expr);
+ params.src_slot_ids.push_back(4);
+ }
+ //col7,col8
+ for (int i = 5; i <= 6; i++) {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = int_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttoint";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = int_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_int_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = i;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(8 + i, expr);
+ params.src_slot_ids.push_back(i);
+ }
+
+ //col9 varchar -> var
+ {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 7;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(15, expr);
+ params.src_slot_ids.push_back(7);
+ }
+
+ params.__set_src_tuple_id(0);
+ params.__set_dest_tuple_id(1);
+
+ //init_desc_table
+ TDescriptorTable t_desc_table;
+
+ // table descriptors
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::BROKER_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ TDescriptorTableBuilder dtb;
+ TTupleDescriptorBuilder src_tuple_builder;
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col1")
+ .column_pos(1)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col2")
+ .column_pos(2)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col3")
+ .column_pos(3)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col5")
+ .column_pos(4)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col6")
+ .column_pos(5)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col7")
+ .column_pos(6)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col8")
+ .column_pos(7)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col9")
+ .column_pos(8)
+ .build());
+ src_tuple_builder.build(&dtb);
+
+ TTupleDescriptorBuilder dest_tuple_builder;
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("col1").column_pos(1).build());
+ dest_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col2")
+ .column_pos(2)
+ .build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().string_type(65535).column_name("col3").column_pos(3).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().type(TYPE_DOUBLE).column_name("col5").column_pos(4).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().type(TYPE_FLOAT).column_name("col6").column_pos(5).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().type(TYPE_INT).column_name("col7").column_pos(6).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().type(TYPE_INT).column_name("col8").column_pos(7).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().string_type(65535).column_name("col9").column_pos(8).build());
+ dest_tuple_builder.build(&dtb);
+ t_desc_table = dtb.desc_tbl();
+
+ DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+ _runtime_state.set_desc_tbl(_desc_tbl);
+
+ std::vector<TBrokerRangeDesc> ranges;
+ TBrokerRangeDesc rangeDesc;
+ rangeDesc.start_offset = 0;
+ rangeDesc.size = -1;
+ rangeDesc.format_type = TFileFormatType::FORMAT_ORC;
+ rangeDesc.splittable = false;
+
+ rangeDesc.path = "./be/test/exec/test_data/orc_scanner/my-file.orc";
+ rangeDesc.file_type = TFileType::FILE_LOCAL;
+ ranges.push_back(rangeDesc);
+
+ VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter,
+ &_counter);
+ EXPECT_TRUE(scanner.open().ok());
+
+ //auto tracker = std::make_shared<MemTracker>();
+ //MemPool tuple_pool(tracker.get());
+
+ //Tuple* tuple = (Tuple*)tuple_pool.allocate(_desc_tbl->get_tuple_descriptor(1)->byte_size());
+ vectorized::Block block;
+ bool eof = false;
+
+ EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+ EXPECT_TRUE(eof);
+ EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+ EXPECT_TRUE(eof);
+ scanner.close();
+}
+
+TEST_F(VOrcScannerTest, normal2) {
+ TBrokerScanRangeParams params;
+ TTypeDesc varchar_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ varchar_type.types.push_back(node);
+ }
+
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::INT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+
+ {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 1;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(3, expr);
+ params.src_slot_ids.push_back(0);
+ params.src_slot_ids.push_back(1);
+ params.src_slot_ids.push_back(2);
+ }
+ params.__set_src_tuple_id(0);
+ params.__set_dest_tuple_id(1);
+
+ //init_desc_table
+ TDescriptorTable t_desc_table;
+
+ // table descriptors
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::BROKER_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ TDescriptorTableBuilder dtb;
+ TTupleDescriptorBuilder src_tuple_builder;
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col1")
+ .column_pos(1)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col2")
+ .column_pos(2)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col3")
+ .column_pos(3)
+ .build());
+ src_tuple_builder.build(&dtb);
+ TTupleDescriptorBuilder dest_tuple_builder;
+ dest_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .column_name("value_from_col2")
+ .column_pos(1)
+ .build());
+
+ dest_tuple_builder.build(&dtb);
+ t_desc_table = dtb.desc_tbl();
+
+ DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+ _runtime_state.set_desc_tbl(_desc_tbl);
+
+ std::vector<TBrokerRangeDesc> ranges;
+ TBrokerRangeDesc rangeDesc;
+ rangeDesc.start_offset = 0;
+ rangeDesc.size = -1;
+ rangeDesc.format_type = TFileFormatType::FORMAT_ORC;
+ rangeDesc.splittable = false;
+
+ rangeDesc.path = "./be/test/exec/test_data/orc_scanner/my-file.orc";
+ rangeDesc.file_type = TFileType::FILE_LOCAL;
+ ranges.push_back(rangeDesc);
+
+ VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter,
+ &_counter);
+ EXPECT_TRUE(scanner.open().ok());
+
+ bool eof = false;
+ vectorized::Block block;
+ EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+ EXPECT_EQ(10, block.rows());
+ EXPECT_TRUE(eof);
+ scanner.close();
+}
+
+TEST_F(VOrcScannerTest, normal3) {
+ TBrokerScanRangeParams params;
+ TTypeDesc varchar_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ varchar_type.types.push_back(node);
+ }
+
+ TTypeDesc decimal_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::DECIMALV2);
+ scalar_type.__set_precision(64);
+ scalar_type.__set_scale(64);
+ node.__set_scalar_type(scalar_type);
+ decimal_type.types.push_back(node);
+ }
+
+ TTypeDesc tinyint_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::TINYINT);
+ node.__set_scalar_type(scalar_type);
+ tinyint_type.types.push_back(node);
+ }
+
+ TTypeDesc datetime_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::DATETIME);
+ node.__set_scalar_type(scalar_type);
+ datetime_type.types.push_back(node);
+ }
+
+ TTypeDesc date_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::DATE);
+ node.__set_scalar_type(scalar_type);
+ date_type.types.push_back(node);
+ }
+
+ {
+ for (int i = 0; i < 5; ++i) {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = decimal_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttodecimalv2";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = decimal_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("cast_to_decimalv2_val(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = i;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(9 + i, expr);
+ params.src_slot_ids.push_back(i);
+ }
+
+ {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = tinyint_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttotinyint";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = tinyint_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("cast_to_tiny_int_val(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_tiny_int_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 5;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(14, expr);
+ params.src_slot_ids.push_back(5);
+ }
+
+ {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = datetime_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttodatetime";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = datetime_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("cast_to_datetime_val(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_datetime_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 6;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(15, expr);
+ params.src_slot_ids.push_back(6);
+ }
+ {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = date_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttodate";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = date_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_date_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 7;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(16, expr);
+ params.src_slot_ids.push_back(7);
+ }
+ {
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = decimal_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttodecimalv2";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = decimal_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("cast_to_decimalv2_val(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = 8;
+ slot_ref.slot_ref.tuple_id = 0;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ params.expr_of_dest_slot.emplace(17, expr);
+ params.src_slot_ids.push_back(8);
+ }
+ }
+ params.__set_src_tuple_id(0);
+ params.__set_dest_tuple_id(1);
+
+ //init_desc_table
+ TDescriptorTable t_desc_table;
+
+ // table descriptors
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::BROKER_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ TDescriptorTableBuilder dtb;
+ TTupleDescriptorBuilder src_tuple_builder;
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col1")
+ .column_pos(1)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col2")
+ .column_pos(2)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col3")
+ .column_pos(3)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col4")
+ .column_pos(4)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col5")
+ .column_pos(5)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col6")
+ .column_pos(6)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col7")
+ .column_pos(7)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col8")
+ .column_pos(8)
+ .build());
+ src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .string_type(65535)
+ .nullable(true)
+ .column_name("col9")
+ .column_pos(9)
+ .build());
+ src_tuple_builder.build(&dtb);
+
+ TTupleDescriptorBuilder dest_tuple_builder;
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().decimal_type(10, 9).column_name("col1").column_pos(1).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().decimal_type(7, 5).column_name("col2").column_pos(2).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().decimal_type(10, 9).column_name("col3").column_pos(3).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().decimal_type(10, 5).column_name("col4").column_pos(4).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().decimal_type(10, 5).column_name("col5").column_pos(5).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("col6").column_pos(6).build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("col7").column_pos(7).build());
+ dest_tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_DATE)
+ .nullable(true)
+ .column_name("col8")
+ .column_pos(8)
+ .build());
+ dest_tuple_builder.add_slot(
+ TSlotDescriptorBuilder().decimal_type(27, 9).column_name("col9").column_pos(9).build());
+
+ dest_tuple_builder.build(&dtb);
+ t_desc_table = dtb.desc_tbl();
+
+ DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+ _runtime_state.set_desc_tbl(_desc_tbl);
+
+ std::vector<TBrokerRangeDesc> ranges;
+ TBrokerRangeDesc rangeDesc;
+ rangeDesc.start_offset = 0;
+ rangeDesc.size = -1;
+ rangeDesc.format_type = TFileFormatType::FORMAT_ORC;
+ rangeDesc.splittable = false;
+
+ rangeDesc.path = "./be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc";
+ rangeDesc.file_type = TFileType::FILE_LOCAL;
+ ranges.push_back(rangeDesc);
+
+ VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter,
+ &_counter);
+ EXPECT_TRUE(scanner.open().ok());
+
+ bool eof = false;
+ vectorized::Block block;
+ EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+ EXPECT_EQ(1, block.rows());
+ EXPECT_TRUE(eof);
+ scanner.close();
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp
new file mode 100644
index 0000000000..ba8f70ce70
--- /dev/null
+++ b/be/test/vec/exec/vparquet_scanner_test.cpp
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <time.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/local_file_reader.h"
+#include "exprs/cast_functions.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+namespace vectorized {
+
+class VParquetScannerTest : public testing::Test {
+public:
+ VParquetScannerTest() : _runtime_state(TQueryGlobals()) {
+ init();
+ _runtime_state._instance_mem_tracker.reset(new MemTracker());
+ _runtime_state._query_options.enable_vectorized_engine = true;
+ }
+ ~VParquetScannerTest() {}
+ void init();
+ static void SetUpTestCase() {
+ UserFunctionCache::instance()->init(
+ "./be/test/runtime/test_data/user_function_cache/normal");
+ CastFunctions::init();
+ }
+
+protected:
+ virtual void SetUp() {}
+ virtual void TearDown() {}
+
+private:
+ int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+ int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+ void create_expr_info();
+ void init_desc_table();
+ RuntimeState _runtime_state;
+ ObjectPool _obj_pool;
+ std::map<std::string, SlotDescriptor*> _slots_map;
+ TBrokerScanRangeParams _params;
+ DescriptorTbl* _desc_tbl;
+ TPlanNode _tnode;
+};
+
+#define TUPLE_ID_DST 0
+#define TUPLE_ID_SRC 1
+#define COLUMN_NUMBERS 20
+#define DST_TUPLE_SLOT_ID_START 1
+#define SRC_TUPLE_SLOT_ID_START 21
+int VParquetScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+ const char* columnNames[] = {
+ "log_version", "log_time", "log_time_stamp", "js_version",
+ "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent",
+ "device_resolution", "page_url", "page_refer_url", "page_yyid",
+ "page_type", "pos_type", "content_id", "media_id",
+ "spm_cnt", "spm_pre", "scm_cnt", "partition_column"};
+ for (int i = 0; i < COLUMN_NUMBERS; i++) {
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 1;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = i;
+ // Skip the first 8 bytes These 8 bytes are used to indicate whether the field is a null value
+ slot_desc.byteOffset = i * 16 + 8;
+ slot_desc.nullIndicatorByte = i / 8;
+ slot_desc.nullIndicatorBit = i % 8;
+ slot_desc.colName = columnNames[i];
+ slot_desc.slotIdx = i + 1;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+
+ {
+ // TTupleDescriptor source
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = TUPLE_ID_SRC;
+ //Here 8 bytes in order to handle null values
+ t_tuple_desc.byteSize = COLUMN_NUMBERS * 16 + 8;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+ return next_slot_id;
+}
+
+int VParquetScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+ int32_t byteOffset =
+ 8; // Skip the first 8 bytes These 8 bytes are used to indicate whether the field is a null value
+ { //log_version
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR); //parquet::Type::BYTE
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 0;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 0;
+ slot_desc.colName = "log_version";
+ slot_desc.slotIdx = 1;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 16;
+ { // log_time
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BIGINT); //parquet::Type::INT64
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 1;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 1;
+ slot_desc.colName = "log_time";
+ slot_desc.slotIdx = 2;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 8;
+ { // log_time_stamp
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BIGINT); //parquet::Type::INT32
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 2;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 2;
+ slot_desc.colName = "log_time_stamp";
+ slot_desc.slotIdx = 3;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 8;
+ const char* columnNames[] = {
+ "log_version", "log_time", "log_time_stamp", "js_version",
+ "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent",
+ "device_resolution", "page_url", "page_refer_url", "page_yyid",
+ "page_type", "pos_type", "content_id", "media_id",
+ "spm_cnt", "spm_pre", "scm_cnt", "partition_column"};
+ for (int i = 3; i < COLUMN_NUMBERS; i++, byteOffset += 16) {
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR); //parquet::Type::BYTE
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = i;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = i / 8;
+ slot_desc.nullIndicatorBit = i % 8;
+ slot_desc.colName = columnNames[i];
+ slot_desc.slotIdx = i + 1;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+
+ t_desc_table.__isset.slotDescriptors = true;
+ {
+ // TTupleDescriptor dest
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = TUPLE_ID_DST;
+ t_tuple_desc.byteSize = byteOffset + 8; //Here 8 bytes in order to handle null values
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+ return next_slot_id;
+}
+
+void VParquetScannerTest::init_desc_table() {
+ TDescriptorTable t_desc_table;
+
+ // table descriptors
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::BROKER_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ int next_slot_id = 1;
+
+ next_slot_id = create_dst_tuple(t_desc_table, next_slot_id);
+
+ next_slot_id = create_src_tuple(t_desc_table, next_slot_id);
+
+ DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+
+ _runtime_state.set_desc_tbl(_desc_tbl);
+}
+
+void VParquetScannerTest::create_expr_info() {
+ TTypeDesc varchar_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(5000);
+ node.__set_scalar_type(scalar_type);
+ varchar_type.types.push_back(node);
+ }
+ // log_version VARCHAR --> VARCHAR
+ {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; // log_time id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START);
+ }
+ // log_time VARCHAR --> BIGINT
+ {
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BIGINT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = int_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttoint";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = int_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; // log_time id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1);
+ }
+ // log_time_stamp VARCHAR --> BIGINT
+ {
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BIGINT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = int_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttoint";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = int_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2;
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2);
+ }
+ // couldn't convert type
+ for (int i = 3; i < COLUMN_NUMBERS; i++) {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + i; // log_time id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + i, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + i);
+ }
+
+ // _params.__isset.expr_of_dest_slot = true;
+ _params.__set_dest_tuple_id(TUPLE_ID_DST);
+ _params.__set_src_tuple_id(TUPLE_ID_SRC);
+}
+
+void VParquetScannerTest::init() {
+ create_expr_info();
+ init_desc_table();
+
+ // Node Id
+ _tnode.node_id = 0;
+ _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
+ _tnode.num_children = 0;
+ _tnode.limit = -1;
+ _tnode.row_tuples.push_back(0);
+ _tnode.nullable_tuples.push_back(false);
+ _tnode.broker_scan_node.tuple_id = 0;
+ _tnode.__isset.broker_scan_node = true;
+}
+
+TEST_F(VParquetScannerTest, normal) {
+ VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+ scan_node.init(_tnode);
+ auto status = scan_node.prepare(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ // set scan range
+ std::vector<TScanRangeParams> scan_ranges;
+ {
+ TScanRangeParams scan_range_params;
+
+ TBrokerScanRange broker_scan_range;
+ broker_scan_range.params = _params;
+ TBrokerRangeDesc range;
+ range.start_offset = 0;
+ range.size = -1;
+ range.format_type = TFileFormatType::FORMAT_PARQUET;
+ range.splittable = true;
+
+ std::vector<std::string> columns_from_path {"value"};
+ range.__set_columns_from_path(columns_from_path);
+ range.__set_num_of_columns_from_file(19);
+#if 1
+ range.path = "./be/test/exec/test_data/parquet_scanner/localfile.parquet";
+ range.file_type = TFileType::FILE_LOCAL;
+#else
+ range.path = "hdfs://ip:8020/user/xxxx.parq";
+ range.file_type = TFileType::FILE_BROKER;
+ TNetworkAddress addr;
+ addr.__set_hostname("127.0.0.1");
+ addr.__set_port(8000);
+ broker_scan_range.broker_addresses.push_back(addr);
+#endif
+ broker_scan_range.ranges.push_back(range);
+ scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+ scan_ranges.push_back(scan_range_params);
+ }
+
+ scan_node.set_scan_ranges(scan_ranges);
+ status = scan_node.open(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ // Get block
+ vectorized::Block block;
+ bool eof = false;
+ for (int i = 0; i < 14; i++) {
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(2048, block.rows());
+ EXPECT_FALSE(eof);
+ block.clear();
+ }
+
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(1328, block.rows());
+ EXPECT_TRUE(eof);
+ block.clear();
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(0, block.rows());
+ EXPECT_TRUE(eof);
+
+ scan_node.close(&_runtime_state);
+ {
+ std::stringstream ss;
+ scan_node.runtime_profile()->pretty_print(&ss);
+ LOG(INFO) << ss.str();
+ }
+}
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index d29ae4bf8e..6b96aa271c 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -360,9 +360,12 @@ build_gtest() {
# rapidjson
build_rapidjson() {
check_if_source_exist $RAPIDJSON_SOURCE
-
- rm -rf $TP_INSTALL_DIR/rapidjson
- cp -r $TP_SOURCE_DIR/$RAPIDJSON_SOURCE/include/rapidjson $TP_INCLUDE_DIR/
+ cd $TP_SOURCE_DIR/$RAPIDJSON_SOURCE
+ mkdir -p $BUILD_DIR && cd $BUILD_DIR
+ rm -rf CMakeCache.txt CMakeFiles/
+ ${CMAKE_CMD} ../ -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR -DRAPIDJSON_BUILD_DOC=OFF \
+ -DRAPIDJSON_BUILD_EXAMPLES=OFF -DRAPIDJSON_BUILD_TESTS=OFF
+ make -j $PARALLEL && make install
}
# snappy
@@ -373,7 +376,7 @@ build_snappy() {
mkdir -p $BUILD_DIR && cd $BUILD_DIR
rm -rf CMakeCache.txt CMakeFiles/
CFLAGS="-O3" CXXFLAGS="-O3" ${CMAKE_CMD} -G "${GENERATOR}" -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR \
- -DCMAKE_POSITION_INDEPENDENT_CODE=On \
+ -DCMAKE_POSITION_INDEPENDENT_CODE=ON \
-DCMAKE_INSTALL_INCLUDEDIR=$TP_INCLUDE_DIR/snappy \
-DSNAPPY_BUILD_TESTS=0 ../
${BUILD_SYSTEM} -j $PARALLEL && ${BUILD_SYSTEM} install
@@ -643,20 +646,23 @@ build_arrow() {
export ARROW_SNAPPY_URL=${TP_SOURCE_DIR}/${SNAPPY_NAME}
export ARROW_ZLIB_URL=${TP_SOURCE_DIR}/${ZLIB_NAME}
export ARROW_XSIMD_URL=${TP_SOURCE_DIR}/${XSIMD_NAME}
+ export ARROW_ORC_URL=${TP_SOURCE_DIR}/${ORC_NAME}
LDFLAGS="-L${TP_LIB_DIR} -static-libstdc++ -static-libgcc" \
${CMAKE_CMD} -G "${GENERATOR}" -DARROW_PARQUET=ON -DARROW_IPC=ON -DARROW_BUILD_SHARED=OFF \
-DARROW_BUILD_STATIC=ON -DARROW_WITH_BROTLI=ON -DARROW_WITH_LZ4=ON -DARROW_USE_GLOG=ON \
-DARROW_WITH_SNAPPY=ON -DARROW_WITH_ZLIB=ON -DARROW_WITH_ZSTD=ON -DARROW_JSON=ON \
- -DARROW_WITH_UTF8PROC=OFF -DARROW_WITH_RE2=OFF \
+ -DARROW_WITH_UTF8PROC=OFF -DARROW_WITH_RE2=ON -DARROW_ORC=ON \
-DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR \
-DCMAKE_INSTALL_LIBDIR=lib64 \
-DARROW_BOOST_USE_SHARED=OFF \
-DARROW_GFLAGS_USE_SHARED=OFF \
-Dgflags_ROOT=$TP_INSTALL_DIR \
-DGLOG_ROOT=$TP_INSTALL_DIR \
+ -DRE2_ROOT=$TP_INSTALL_DIR \
-DZLIB_LIBRARY=$TP_INSTALL_DIR/lib/libz.a -DZLIB_INCLUDE_DIR=$TP_INSTALL_DIR/include \
-DRapidJSON_ROOT=$TP_INSTALL_DIR \
+ -DORC_ROOT=$TP_INSTALL_DIR \
-DBrotli_SOURCE=BUNDLED \
-DLZ4_LIB=$TP_INSTALL_DIR/lib/liblz4.a -DLZ4_INCLUDE_DIR=$TP_INSTALL_DIR/include/lz4 \
-DLz4_SOURCE=SYSTEM \
@@ -664,7 +670,6 @@ build_arrow() {
-Dzstd_SOURCE=SYSTEM \
-DSnappy_LIB=$TP_INSTALL_DIR/lib/libsnappy.a -DSnappy_INCLUDE_DIR=$TP_INSTALL_DIR/include \
-DSnappy_SOURCE=SYSTEM \
- -DBoost_INCLUDE_DIR=$TP_INSTALL_DIR/include \
-DThrift_ROOT=$TP_INSTALL_DIR ..
${BUILD_SYSTEM} -j $PARALLEL && ${BUILD_SYSTEM} install
@@ -1011,6 +1016,7 @@ build_rocksdb
build_cyrus_sasl
build_librdkafka
build_flatbuffers
+build_orc
build_arrow
build_s2
build_bitshuffle
@@ -1019,7 +1025,6 @@ build_fmt
build_parallel_hashmap
build_pdqsort
build_libdivide
-build_orc
build_cctz
build_tsan_header
build_mysql
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org