You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/26 13:39:17 UTC

[incubator-doris] branch master updated: [feature-wip](parquet-orc) Support orc scanner in vectorized engine (#9541)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cbbda7857b [feature-wip](parquet-orc) Support orc scanner in vectorized engine (#9541)
cbbda7857b is described below

commit cbbda7857bbdb308d5d24f9af16aaba8dc8d9b47
Author: yinzhijian <37...@qq.com>
AuthorDate: Thu May 26 21:39:12 2022 +0800

    [feature-wip](parquet-orc) Support orc scanner in vectorized engine (#9541)
---
 be/src/exec/CMakeLists.txt                         |   4 +-
 be/src/exec/arrow/arrow_reader.cpp                 | 156 ++++
 be/src/exec/arrow/arrow_reader.h                   | 101 +++
 be/src/exec/arrow/orc_reader.cpp                   | 115 +++
 be/src/exec/arrow/orc_reader.h                     |  51 ++
 be/src/exec/{ => arrow}/parquet_reader.cpp         | 147 +---
 be/src/exec/{ => arrow}/parquet_reader.h           |  53 +-
 be/src/exec/broker_scan_node.cpp                   |  13 +-
 be/src/exec/parquet_scanner.cpp                    |  12 +-
 be/src/http/action/stream_load.cpp                 |   2 +
 be/src/vec/CMakeLists.txt                          |   2 +
 be/src/vec/data_types/data_type_factory.cpp        |   8 +-
 be/src/vec/data_types/data_type_factory.hpp        |   2 +-
 .../{vparquet_scanner.cpp => varrow_scanner.cpp}   | 143 +++-
 .../exec/{vparquet_scanner.h => varrow_scanner.h}  |  42 +-
 be/src/vec/exec/vorc_scanner.cpp                   |  37 +
 .../exec/{vparquet_scanner.h => vorc_scanner.h}    |  37 +-
 be/src/vec/exec/vparquet_scanner.cpp               | 191 +----
 be/src/vec/exec/vparquet_scanner.h                 |  24 +-
 be/test/CMakeLists.txt                             |   2 +
 be/test/olap/hll_test.cpp                          |   2 +-
 be/test/vec/exec/vorc_scanner_test.cpp             | 892 +++++++++++++++++++++
 be/test/vec/exec/vparquet_scanner_test.cpp         | 499 ++++++++++++
 thirdparty/build-thirdparty.sh                     |  19 +-
 24 files changed, 2098 insertions(+), 456 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 202ae767b0..d93c394047 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -22,6 +22,9 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/exec")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec")
 
 set(EXEC_FILES
+    arrow/arrow_reader.cpp
+    arrow/orc_reader.cpp
+    arrow/parquet_reader.cpp
     analytic_eval_node.cpp
     blocking_join_node.cpp
     broker_scan_node.cpp
@@ -94,7 +97,6 @@ set(EXEC_FILES
     local_file_writer.cpp
     broker_writer.cpp
     parquet_scanner.cpp
-    parquet_reader.cpp
     parquet_writer.cpp
     orc_scanner.cpp
     odbc_connector.cpp
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
new file mode 100644
index 0000000000..789c66be2c
--- /dev/null
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "exec/arrow/arrow_reader.h"
+
+#include <arrow/array.h>
+#include <arrow/status.h>
+#include <time.h>
+
+#include "common/logging.h"
+#include "exec/file_reader.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/TPaloBrokerService.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_pool.h"
+#include "runtime/tuple.h"
+#include "util/thrift_util.h"
+
+namespace doris {
+
+// Broker
+
+ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
+                                 int32_t num_of_columns_from_file)
+        : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) {
+    _arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
+    _rb_reader = nullptr;
+    _total_groups = 0;
+    _current_group = 0;
+}
+
+ArrowReaderWrap::~ArrowReaderWrap() {
+    close();
+}
+
+void ArrowReaderWrap::close() {
+    arrow::Status st = _arrow_file->Close();
+    if (!st.ok()) {
+        LOG(WARNING) << "close file error: " << st.ToString();
+    }
+}
+
+Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
+    DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size());
+    _include_column_ids.clear();
+    for (int i = 0; i < _num_of_columns_from_file; i++) {
+        auto slot_desc = tuple_slot_descs.at(i);
+        // Get the Column Reader for the boolean column
+        auto iter = _map_column.find(slot_desc->col_name());
+        if (iter != _map_column.end()) {
+            _include_column_ids.emplace_back(iter->second);
+        } else {
+            std::stringstream str_error;
+            str_error << "Invalid Column Name:" << slot_desc->col_name();
+            LOG(WARNING) << str_error.str();
+            return Status::InvalidArgument(str_error.str());
+        }
+    }
+    return Status::OK();
+}
+
+ArrowFile::ArrowFile(FileReader* file) : _file(file) {}
+
+ArrowFile::~ArrowFile() {
+    arrow::Status st = Close();
+    if (!st.ok()) {
+        LOG(WARNING) << "close file error: " << st.ToString();
+    }
+}
+
+arrow::Status ArrowFile::Close() {
+    if (_file != nullptr) {
+        _file->close();
+        delete _file;
+        _file = nullptr;
+    }
+    return arrow::Status::OK();
+}
+
+bool ArrowFile::closed() const {
+    if (_file != nullptr) {
+        return _file->closed();
+    } else {
+        return true;
+    }
+}
+
+arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes, void* buffer) {
+    return ReadAt(_pos, nbytes, buffer);
+}
+
+arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
+    int64_t reads = 0;
+    int64_t bytes_read = 0;
+    _pos = position;
+    while (nbytes > 0) {
+        Status result = _file->readat(_pos, nbytes, &reads, out);
+        if (!result.ok()) {
+            return arrow::Status::IOError("Readat failed.");
+        }
+        if (reads == 0) {
+            break;
+        }
+        bytes_read += reads; // total read bytes
+        nbytes -= reads;     // remained bytes
+        _pos += reads;
+        out = (char*)out + reads;
+    }
+    return bytes_read;
+}
+
+arrow::Result<int64_t> ArrowFile::GetSize() {
+    return _file->size();
+}
+
+arrow::Status ArrowFile::Seek(int64_t position) {
+    _pos = position;
+    // NOTE: Only readat operation is used, so _file seek is not called here.
+    return arrow::Status::OK();
+}
+
+arrow::Result<int64_t> ArrowFile::Tell() const {
+    return _pos;
+}
+
+arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowFile::Read(int64_t nbytes) {
+    auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool());
+    ARROW_RETURN_NOT_OK(buffer);
+    std::shared_ptr<arrow::Buffer> read_buf = std::move(buffer.ValueOrDie());
+    auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data());
+    ARROW_RETURN_NOT_OK(bytes_read);
+    // If bytes_read is equal with read_buf's capacity, we just assign
+    if (bytes_read.ValueOrDie() == nbytes) {
+        return std::move(read_buf);
+    } else {
+        return arrow::SliceBuffer(read_buf, 0, bytes_read.ValueOrDie());
+    }
+}
+
+} // namespace doris
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
new file mode 100644
index 0000000000..4149f02493
--- /dev/null
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/api.h>
+#include <arrow/buffer.h>
+#include <arrow/io/api.h>
+#include <arrow/io/file.h>
+#include <arrow/io/interfaces.h>
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/exception.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+#include "common/status.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+
+namespace doris {
+
+class ExecEnv;
+class TBrokerRangeDesc;
+class TNetworkAddress;
+class RuntimeState;
+class Tuple;
+class SlotDescriptor;
+class MemPool;
+class FileReader;
+
+class ArrowFile : public arrow::io::RandomAccessFile {
+public:
+    ArrowFile(FileReader* file);
+    virtual ~ArrowFile();
+    arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
+    arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
+    arrow::Result<int64_t> GetSize() override;
+    arrow::Status Seek(int64_t position) override;
+    arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
+    arrow::Result<int64_t> Tell() const override;
+    arrow::Status Close() override;
+    bool closed() const override;
+
+private:
+    FileReader* _file;
+    int64_t _pos = 0;
+};
+
+// base of arrow reader
+class ArrowReaderWrap {
+public:
+    ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
+    virtual ~ArrowReaderWrap();
+
+    virtual Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                               const std::string& timezone) = 0;
+    // for row
+    virtual Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                        MemPool* mem_pool, bool* eof) {
+        return Status::NotSupported("Not Implemented read");
+    }
+    // for vec
+    virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) = 0;
+    virtual void close();
+    virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); }
+
+protected:
+    virtual Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
+
+protected:
+    const int64_t _batch_size;
+    const int32_t _num_of_columns_from_file;
+    std::shared_ptr<ArrowFile> _arrow_file;
+    std::shared_ptr<::arrow::RecordBatchReader> _rb_reader;
+    int _total_groups;                      // num of groups(stripes) of a parquet(orc) file
+    int _current_group;                     // current group(stripe)
+    std::map<std::string, int> _map_column; // column-name <---> column-index
+    std::vector<int> _include_column_ids;   // columns that need to get from file
+};
+
+} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
new file mode 100644
index 0000000000..5815f008df
--- /dev/null
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "exec/arrow/orc_reader.h"
+
+#include <arrow/array.h>
+#include <arrow/status.h>
+#include <time.h>
+
+#include "common/logging.h"
+#include "exec/file_reader.h"
+#include "runtime/mem_pool.h"
+#include "runtime/tuple.h"
+
+namespace doris {
+
+ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
+                             int32_t num_of_columns_from_file)
+        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file) {
+    _reader = nullptr;
+    _cur_file_eof = false;
+}
+
+Status ORCReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                                  const std::string& timezone) {
+    // Open ORC file reader
+    auto maybe_reader =
+            arrow::adapters::orc::ORCFileReader::Open(_arrow_file, arrow::default_memory_pool());
+    if (!maybe_reader.ok()) {
+        // Handle error instantiating file reader...
+        LOG(WARNING) << "failed to create orc file reader, errmsg=" << maybe_reader.status();
+        return Status::InternalError("Failed to create orc file reader");
+    }
+    _reader = std::move(maybe_reader.ValueOrDie());
+    _total_groups = _reader->NumberOfStripes();
+    if (_total_groups == 0) {
+        return Status::EndOfFile("Empty Orc File");
+    }
+
+    // map
+    arrow::Result<std::shared_ptr<arrow::Schema>> maybe_schema = _reader->ReadSchema();
+    if (!maybe_schema.ok()) {
+        // Handle error instantiating file reader...
+        LOG(WARNING) << "failed to read schema, errmsg=" << maybe_schema.status();
+        return Status::InternalError("Failed to create orc file reader");
+    }
+    std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
+    for (size_t i = 0; i < schema->num_fields(); ++i) {
+        _map_column.emplace(schema->field(i)->name(), i);
+    }
+
+    bool eof = false;
+    RETURN_IF_ERROR(_next_stripe_reader(&eof));
+    if (eof) {
+        return Status::EndOfFile("end of file");
+    }
+
+    RETURN_IF_ERROR(column_indices(tuple_slot_descs));
+    return Status::OK();
+}
+
+Status ORCReaderWrap::_next_stripe_reader(bool* eof) {
+    if (_current_group >= _total_groups) {
+        *eof = true;
+        return Status::OK();
+    }
+    // Get a stripe level record batch iterator.
+    // record batch will have up to batch_size rows.
+    // NextStripeReader serves as a fine grained alternative to ReadStripe
+    // which may cause OOM issues by loading the whole stripe into memory.
+    // Note this will only read rows for the current stripe, not the entire file.
+    arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> maybe_rb_reader =
+            _reader->NextStripeReader(_batch_size, _include_column_ids);
+    if (!maybe_rb_reader.ok()) {
+        LOG(WARNING) << "Get RecordBatch Failed. " << maybe_rb_reader.status();
+        return Status::InternalError(maybe_rb_reader.status().ToString());
+    }
+    _rb_reader = maybe_rb_reader.ValueOrDie();
+    _current_group++;
+    return Status::OK();
+}
+
+Status ORCReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
+    *eof = false;
+    do {
+        auto st = _rb_reader->ReadNext(batch);
+        if (!st.ok()) {
+            LOG(WARNING) << "failed to get next batch, errmsg=" << st;
+            return Status::InternalError(st.ToString());
+        }
+        if (*batch == nullptr) {
+            // try next stripe
+            RETURN_IF_ERROR(_next_stripe_reader(eof));
+            if (*eof) {
+                break;
+            }
+        }
+    } while (*batch == nullptr);
+    return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
new file mode 100644
index 0000000000..5213a18dcf
--- /dev/null
+++ b/be/src/exec/arrow/orc_reader.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/adapters/orc/adapter.h>
+#include <arrow/api.h>
+#include <arrow/buffer.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+#include "common/status.h"
+#include "exec/arrow/arrow_reader.h"
+namespace doris {
+
+// Reader of orc file
+class ORCReaderWrap final : public ArrowReaderWrap {
+public:
+    ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
+    ~ORCReaderWrap() override = default;
+
+    Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                       const std::string& timezone) override;
+    Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
+
+private:
+    Status _next_stripe_reader(bool* eof);
+
+private:
+    // orc file reader object
+    std::unique_ptr<arrow::adapters::orc::ORCFileReader> _reader;
+    bool _cur_file_eof; // is read over?
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
similarity index 84%
rename from be/src/exec/parquet_reader.cpp
rename to be/src/exec/arrow/parquet_reader.cpp
index 53880cdbb3..5c57efc4be 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "exec/parquet_reader.h"
+#include "exec/arrow/parquet_reader.h"
 
 #include <arrow/array.h>
 #include <arrow/status.h>
@@ -42,21 +42,15 @@ namespace doris {
 
 // Broker
 
-ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file)
-        : _num_of_columns_from_file(num_of_columns_from_file),
-          _total_groups(0),
-          _current_group(0),
+ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
+                                     int32_t num_of_columns_from_file)
+        : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
           _rows_of_group(0),
           _current_line_of_group(0),
-          _current_line_of_batch(0) {
-    _parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
-}
+          _current_line_of_batch(0) {}
 
-ParquetReaderWrap::~ParquetReaderWrap() {
-    close();
-}
-Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
-                                              const std::string& timezone) {
+Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                                      const std::string& timezone) {
     try {
         parquet::ArrowReaderProperties arrow_reader_properties =
                 parquet::default_arrow_reader_properties();
@@ -66,7 +60,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
         auto reader_builder = parquet::arrow::FileReaderBuilder();
         reader_builder.properties(arrow_reader_properties);
 
-        auto st = reader_builder.Open(_parquet);
+        auto st = reader_builder.Open(_arrow_file);
 
         if (!st.ok()) {
             LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
@@ -111,7 +105,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
         _current_line_of_batch = 0;
         //save column type
         std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
-        for (int i = 0; i < _parquet_column_ids.size(); i++) {
+        for (int i = 0; i < _include_column_ids.size(); i++) {
             std::shared_ptr<arrow::Field> field = field_schema->field(i);
             if (!field) {
                 LOG(WARNING) << "Get field schema failed. Column order:" << i;
@@ -131,14 +125,11 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
 void ParquetReaderWrap::close() {
     _closed = true;
     _queue_writer_cond.notify_one();
-    arrow::Status st = _parquet->Close();
-    if (!st.ok()) {
-        LOG(WARNING) << "close parquet file error: " << st.ToString();
-    }
+    ArrowReaderWrap::close();
 }
 
 Status ParquetReaderWrap::size(int64_t* size) {
-    arrow::Result<int64_t> result = _parquet->GetSize();
+    arrow::Result<int64_t> result = _arrow_file->GetSize();
     if (result.ok()) {
         *size = result.ValueOrDie();
         return Status::OK();
@@ -158,24 +149,6 @@ inline void ParquetReaderWrap::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc
     return;
 }
 
-Status ParquetReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs) {
-    _parquet_column_ids.clear();
-    for (int i = 0; i < _num_of_columns_from_file; i++) {
-        auto slot_desc = tuple_slot_descs.at(i);
-        // Get the Column Reader for the boolean column
-        auto iter = _map_column.find(slot_desc->col_name());
-        if (iter != _map_column.end()) {
-            _parquet_column_ids.emplace_back(iter->second);
-        } else {
-            std::stringstream str_error;
-            str_error << "Invalid Column Name:" << slot_desc->col_name();
-            LOG(WARNING) << str_error.str();
-            return Status::InvalidArgument(str_error.str());
-        }
-    }
-    return Status::OK();
-}
-
 inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc) {
     if (!slot_desc->is_nullable()) {
         std::stringstream str_error;
@@ -188,8 +161,7 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescript
     return Status::OK();
 }
 
-Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs,
-                                            bool* eof) {
+Status ParquetReaderWrap::read_record_batch(bool* eof) {
     if (_current_line_of_group >= _rows_of_group) { // read next row group
         VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
                    << " current line of group:" << _current_line_of_group
@@ -197,7 +169,7 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
                    << ". start to read next row group";
         _current_group++;
         if (_current_group >= _total_groups) { // read completed.
-            _parquet_column_ids.clear();
+            _include_column_ids.clear();
             *eof = true;
             return Status::OK();
         }
@@ -219,11 +191,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
     return Status::OK();
 }
 
-Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
-                                     const std::vector<SlotDescriptor*>& tuple_slot_descs,
-                                     bool* eof) {
+Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
     if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) {
-        RETURN_IF_ERROR(read_record_batch(tuple_slot_descs, eof));
+        RETURN_IF_ERROR(read_record_batch(eof));
     }
     *batch = get_batch();
     return Status::OK();
@@ -281,7 +251,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
     const uint8_t* value = nullptr;
     int column_index = 0;
     try {
-        size_t slots = _parquet_column_ids.size();
+        size_t slots = _include_column_ids.size();
         for (size_t i = 0; i < slots; ++i) {
             auto slot_desc = tuple_slot_descs[i];
             column_index = i; // column index in batch record
@@ -550,7 +520,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
     // update data value
     ++_current_line_of_group;
     ++_current_line_of_batch;
-    return read_record_batch(tuple_slot_descs, eof);
+    return read_record_batch(eof);
 }
 
 void ParquetReaderWrap::prefetch_batch() {
@@ -570,13 +540,13 @@ void ParquetReaderWrap::prefetch_batch() {
         if (_closed || current_group >= _total_groups) {
             return;
         }
-        _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch);
+        _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader);
         if (!_status.ok()) {
             _closed = true;
             return;
         }
         arrow::RecordBatchVector batches;
-        _status = _rb_batch->ReadAll(&batches);
+        _status = _rb_reader->ReadAll(&batches);
         if (!_status.ok()) {
             _closed = true;
             return;
@@ -602,83 +572,4 @@ Status ParquetReaderWrap::read_next_batch() {
     return Status::OK();
 }
 
-ParquetFile::ParquetFile(FileReader* file) : _file(file) {}
-
-ParquetFile::~ParquetFile() {
-    arrow::Status st = Close();
-    if (!st.ok()) {
-        LOG(WARNING) << "close parquet file error: " << st.ToString();
-    }
-}
-
-arrow::Status ParquetFile::Close() {
-    if (_file != nullptr) {
-        _file->close();
-        delete _file;
-        _file = nullptr;
-    }
-    return arrow::Status::OK();
-}
-
-bool ParquetFile::closed() const {
-    if (_file != nullptr) {
-        return _file->closed();
-    } else {
-        return true;
-    }
-}
-
-arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* buffer) {
-    return ReadAt(_pos, nbytes, buffer);
-}
-
-arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
-    int64_t reads = 0;
-    int64_t bytes_read = 0;
-    _pos = position;
-    while (nbytes > 0) {
-        Status result = _file->readat(_pos, nbytes, &reads, out);
-        if (!result.ok()) {
-            bytes_read = 0;
-            return arrow::Status::IOError("Readat failed.");
-        }
-        if (reads == 0) {
-            break;
-        }
-        bytes_read += reads; // total read bytes
-        nbytes -= reads;     // remained bytes
-        _pos += reads;
-        out = (char*)out + reads;
-    }
-    return bytes_read;
-}
-
-arrow::Result<int64_t> ParquetFile::GetSize() {
-    return _file->size();
-}
-
-arrow::Status ParquetFile::Seek(int64_t position) {
-    _pos = position;
-    // NOTE: Only readat operation is used, so _file seek is not called here.
-    return arrow::Status::OK();
-}
-
-arrow::Result<int64_t> ParquetFile::Tell() const {
-    return _pos;
-}
-
-arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::Read(int64_t nbytes) {
-    auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool());
-    ARROW_RETURN_NOT_OK(buffer);
-    std::shared_ptr<arrow::Buffer> read_buf = std::move(buffer.ValueOrDie());
-    auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data());
-    ARROW_RETURN_NOT_OK(bytes_read);
-    // If bytes_read is equal with read_buf's capacity, we just assign
-    if (bytes_read.ValueOrDie() == nbytes) {
-        return std::move(read_buf);
-    } else {
-        return arrow::SliceBuffer(read_buf, 0, bytes_read.ValueOrDie());
-    }
-}
-
 } // namespace doris
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
similarity index 64%
rename from be/src/exec/parquet_reader.h
rename to be/src/exec/arrow/parquet_reader.h
index 77b946b691..597dea2feb 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -40,6 +40,7 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "exec/arrow/arrow_reader.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
@@ -55,46 +56,28 @@ class SlotDescriptor;
 class MemPool;
 class FileReader;
 
-class ParquetFile : public arrow::io::RandomAccessFile {
+// Reader of parquet file
+class ParquetReaderWrap final : public ArrowReaderWrap {
 public:
-    ParquetFile(FileReader* file);
-    ~ParquetFile() override;
-    arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
-    arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
-    arrow::Result<int64_t> GetSize() override;
-    arrow::Status Seek(int64_t position) override;
-    arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
-    arrow::Result<int64_t> Tell() const override;
-    arrow::Status Close() override;
-    bool closed() const override;
-
-private:
-    FileReader* _file;
-    int64_t _pos = 0;
-};
-
-// Reader of broker parquet file
-class ParquetReaderWrap {
-public:
-    ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file);
-    virtual ~ParquetReaderWrap();
+    // batch_size is not use here
+    ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
+                      int32_t num_of_columns_from_file);
+    ~ParquetReaderWrap() override = default;
 
     // Read
     Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
-                MemPool* mem_pool, bool* eof);
-    void close();
-    Status size(int64_t* size);
-    Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
-                               const std::string& timezone);
-    Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
-                      const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);
+                MemPool* mem_pool, bool* eof) override;
+    Status size(int64_t* size) override;
+    Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                       const std::string& timezone) override;
+    Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
+    void close() override;
 
 private:
     void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value,
                    int32_t len);
-    Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
     Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc);
-    Status read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);
+    Status read_record_batch(bool* eof);
     const std::shared_ptr<arrow::RecordBatch>& get_batch();
     Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
                             int32_t* wbtyes);
@@ -104,19 +87,11 @@ private:
     Status read_next_batch();
 
 private:
-    const int32_t _num_of_columns_from_file;
-    std::shared_ptr<ParquetFile> _parquet;
-
     // parquet file reader object
-    std::unique_ptr<::arrow::RecordBatchReader> _rb_batch;
     std::shared_ptr<arrow::RecordBatch> _batch;
     std::unique_ptr<parquet::arrow::FileReader> _reader;
     std::shared_ptr<parquet::FileMetaData> _file_metadata;
-    std::map<std::string, int> _map_column; // column-name <---> column-index
-    std::vector<int> _parquet_column_ids;
     std::vector<arrow::Type::type> _parquet_column_type;
-    int _total_groups; // groups in a parquet file
-    int _current_group;
 
     int _rows_of_group; // rows in a group.
     int _current_line_of_group;
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index f2f3742cf8..a378c41203 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -33,6 +33,7 @@
 #include "util/thread.h"
 #include "vec/exec/vbroker_scanner.h"
 #include "vec/exec/vjson_scanner.h"
+#include "vec/exec/vorc_scanner.h"
 #include "vec/exec/vparquet_scanner.h"
 
 namespace doris {
@@ -237,9 +238,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
         }
         break;
     case TFileFormatType::FORMAT_ORC:
-        scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
-                              scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs,
-                              counter);
+        if (_vectorized) {
+            scan = new vectorized::VORCScanner(_runtime_state, runtime_profile(), scan_range.params,
+                                               scan_range.ranges, scan_range.broker_addresses,
+                                               _pre_filter_texprs, counter);
+        } else {
+            scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
+                                  scan_range.ranges, scan_range.broker_addresses,
+                                  _pre_filter_texprs, counter);
+        }
         break;
     case TFileFormatType::FORMAT_JSON:
         if (_vectorized) {
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index c6cb02e8c2..880313b6f0 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -17,12 +17,12 @@
 
 #include "exec/parquet_scanner.h"
 
+#include "exec/arrow/parquet_reader.h"
 #include "exec/broker_reader.h"
 #include "exec/buffered_reader.h"
 #include "exec/decompressor.h"
 #include "exec/hdfs_reader_writer.h"
 #include "exec/local_file_reader.h"
-#include "exec/parquet_reader.h"
 #include "exec/s3_reader.h"
 #include "exec/text_converter.h"
 #include "runtime/exec_env.h"
@@ -141,14 +141,14 @@ Status ParquetScanner::open_next_reader() {
             file_reader->close();
             continue;
         }
+        int32_t num_of_columns_from_file = _src_slot_descs.size();
         if (range.__isset.num_of_columns_from_file) {
-            _cur_file_reader =
-                    new ParquetReaderWrap(file_reader.release(), range.num_of_columns_from_file);
-        } else {
-            _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _src_slot_descs.size());
+            num_of_columns_from_file = range.num_of_columns_from_file;
         }
+        _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(),
+                                                 num_of_columns_from_file);
 
-        Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs, _state->timezone());
+        Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
 
         if (status.is_end_of_file()) {
             continue;
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index b9087990ca..28bfafedf6 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -101,6 +101,8 @@ static TFileFormatType::type parse_format(const std::string& format_str,
         }
     } else if (iequal(format_str, "PARQUET")) {
         format_type = TFileFormatType::FORMAT_PARQUET;
+    } else if (iequal(format_str, "ORC")) {
+        format_type = TFileFormatType::FORMAT_ORC;
     }
     return format_type;
 }
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 265d6fd884..49f92ddf79 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -77,6 +77,7 @@ set(VEC_FILES
   data_types/data_type_date.cpp
   data_types/data_type_date_time.cpp
   exec/vaggregation_node.cpp
+  exec/varrow_scanner.cpp
   exec/ves_http_scan_node.cpp
   exec/ves_http_scanner.cpp
   exec/volap_scan_node.cpp
@@ -103,6 +104,7 @@ set(VEC_FILES
   exec/vbroker_scanner.cpp
   exec/vjson_scanner.cpp
   exec/vparquet_scanner.cpp
+  exec/vorc_scanner.cpp
   exec/join/vhash_join_node.cpp
   exprs/vectorized_agg_fn.cpp
   exprs/vectorized_fn_call.cpp
diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp
index 557b978c8e..2071b8cedb 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -260,9 +260,9 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
     return nested;
 }
 
-DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, bool is_nullable) {
+DataTypePtr DataTypeFactory::create_data_type(const arrow::DataType* type, bool is_nullable) {
     DataTypePtr nested = nullptr;
-    switch (type) {
+    switch (type->id()) {
     case ::arrow::Type::BOOL:
         nested = std::make_shared<vectorized::DataTypeUInt8>();
         break;
@@ -310,10 +310,10 @@ DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, boo
         nested = std::make_shared<vectorized::DataTypeString>();
         break;
     case ::arrow::Type::DECIMAL:
-        nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
+        nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>();
         break;
     default:
-        DCHECK(false) << "invalid arrow type:" << (int)type;
+        DCHECK(false) << "invalid arrow type:" << (int)(type->id());
         break;
     }
 
diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp
index 3b667c6f72..968617414c 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -88,7 +88,7 @@ public:
 
     DataTypePtr create_data_type(const PColumnMeta& pcolumn);
 
-    DataTypePtr create_data_type(const arrow::Type::type& type, bool is_nullable);
+    DataTypePtr create_data_type(const arrow::DataType* type, bool is_nullable);
 
 private:
     DataTypePtr _create_primitive_data_type(const FieldType& type) const;
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
similarity index 61%
copy from be/src/vec/exec/vparquet_scanner.cpp
copy to be/src/vec/exec/varrow_scanner.cpp
index 037bc15028..b44475ce19 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -15,33 +15,118 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/exec/vparquet_scanner.h"
-
-#include "exec/parquet_reader.h"
+#include "exec/arrow/parquet_reader.h"
+#include "exec/broker_reader.h"
+#include "exec/buffered_reader.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_reader.h"
+#include "exec/s3_reader.h"
 #include "exprs/expr.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/vorc_scanner.h"
 #include "vec/functions/simple_function_factory.h"
 #include "vec/utils/arrow_column_to_doris_column.h"
 
 namespace doris::vectorized {
 
-VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
-                                 const TBrokerScanRangeParams& params,
-                                 const std::vector<TBrokerRangeDesc>& ranges,
-                                 const std::vector<TNetworkAddress>& broker_addresses,
-                                 const std::vector<TExpr>& pre_filter_texprs,
-                                 ScannerCounter* counter)
-        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
-                         counter),
+VArrowScanner::VArrowScanner(RuntimeState* state, RuntimeProfile* profile,
+                             const TBrokerScanRangeParams& params,
+                             const std::vector<TBrokerRangeDesc>& ranges,
+                             const std::vector<TNetworkAddress>& broker_addresses,
+                             const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+          // _splittable(params.splittable),
+          _cur_file_reader(nullptr),
+          _cur_file_eof(false),
           _batch(nullptr),
           _arrow_batch_cur_idx(0) {}
 
-VParquetScanner::~VParquetScanner() = default;
+VArrowScanner::~VArrowScanner() {
+    close();
+}
+
+Status VArrowScanner::_open_next_reader() {
+    // open_file_reader
+    if (_cur_file_reader != nullptr) {
+        delete _cur_file_reader;
+        _cur_file_reader = nullptr;
+    }
+
+    while (true) {
+        if (_next_range >= _ranges.size()) {
+            _scanner_eof = true;
+            return Status::OK();
+        }
+        const TBrokerRangeDesc& range = _ranges[_next_range++];
+        std::unique_ptr<FileReader> file_reader;
+        switch (range.file_type) {
+        case TFileType::FILE_LOCAL: {
+            file_reader.reset(new LocalFileReader(range.path, range.start_offset));
+            break;
+        }
+        case TFileType::FILE_HDFS: {
+            FileReader* reader;
+            RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+                                                            range.start_offset, &reader));
+            file_reader.reset(reader);
+            break;
+        }
+        case TFileType::FILE_BROKER: {
+            int64_t file_size = 0;
+            // for compatibility
+            if (range.__isset.file_size) {
+                file_size = range.file_size;
+            }
+            file_reader.reset(new BufferedReader(
+                    _profile,
+                    new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
+                                     range.path, range.start_offset, file_size)));
+            break;
+        }
+        case TFileType::FILE_S3: {
+            file_reader.reset(new BufferedReader(
+                    _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
+            break;
+        }
+        default: {
+            std::stringstream ss;
+            ss << "Unknown file type, type=" << range.file_type;
+            return Status::InternalError(ss.str());
+        }
+        }
+        RETURN_IF_ERROR(file_reader->open());
+        if (file_reader->size() == 0) {
+            file_reader->close();
+            continue;
+        }
+
+        int32_t num_of_columns_from_file = _src_slot_descs.size();
+        if (range.__isset.num_of_columns_from_file) {
+            num_of_columns_from_file = range.num_of_columns_from_file;
+        }
+        _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
+                                             num_of_columns_from_file);
+
+        Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
+
+        if (status.is_end_of_file()) {
+            continue;
+        } else {
+            if (!status.ok()) {
+                std::stringstream ss;
+                ss << " file: " << range.path << " error:" << status.get_error_msg();
+                return Status::InternalError(ss.str());
+            } else {
+                return status;
+            }
+        }
+    }
+}
 
-Status VParquetScanner::open() {
-    RETURN_IF_ERROR(ParquetScanner::open());
+Status VArrowScanner::open() {
+    RETURN_IF_ERROR(BaseScanner::open());
     if (_ranges.empty()) {
         return Status::OK();
     }
@@ -49,18 +134,18 @@ Status VParquetScanner::open() {
 }
 
 // get next available arrow batch
-Status VParquetScanner::_next_arrow_batch() {
+Status VArrowScanner::_next_arrow_batch() {
     _arrow_batch_cur_idx = 0;
     // first, init file reader
     if (_cur_file_reader == nullptr || _cur_file_eof) {
-        RETURN_IF_ERROR(open_next_reader());
+        RETURN_IF_ERROR(_open_next_reader());
         _cur_file_eof = false;
     }
     // second, loop until find available arrow batch or EOF
     while (!_scanner_eof) {
-        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
         if (_cur_file_eof) {
-            RETURN_IF_ERROR(open_next_reader());
+            RETURN_IF_ERROR(_open_next_reader());
             _cur_file_eof = false;
             continue;
         }
@@ -72,7 +157,7 @@ Status VParquetScanner::_next_arrow_batch() {
     return Status::EndOfFile("EOF");
 }
 
-Status VParquetScanner::_init_arrow_batch_if_necessary() {
+Status VArrowScanner::_init_arrow_batch_if_necessary() {
     // 1. init batch if first time
     // 2. reset reader if end of file
     Status status;
@@ -85,7 +170,7 @@ Status VParquetScanner::_init_arrow_batch_if_necessary() {
     return status;
 }
 
-Status VParquetScanner::_init_src_block() {
+Status VArrowScanner::_init_src_block() {
     size_t batch_pos = 0;
     _src_block.clear();
     for (auto i = 0; i < _num_of_columns_from_file; ++i) {
@@ -98,7 +183,7 @@ Status VParquetScanner::_init_src_block() {
         // TODO, support not nullable for exec efficiently
         auto is_nullable = true;
         DataTypePtr data_type =
-                DataTypeFactory::instance().create_data_type(array->type()->id(), is_nullable);
+                DataTypeFactory::instance().create_data_type(array->type().get(), is_nullable);
         if (data_type == nullptr) {
             return Status::NotSupported(
                     fmt::format("Not support arrow type:{}", array->type()->name()));
@@ -110,7 +195,7 @@ Status VParquetScanner::_init_src_block() {
     return Status::OK();
 }
 
-Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
+Status VArrowScanner::get_next(vectorized::Block* block, bool* eof) {
     // overall of type converting:
     // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
     // primitive type(PT1) ==materialize_block==> dest primitive type
@@ -171,7 +256,7 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
 
 // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
 // primitive type(PT1) ==materialize_block==> dest primitive type
-Status VParquetScanner::_cast_src_block(Block* block) {
+Status VArrowScanner::_cast_src_block(Block* block) {
     // cast primitive type(PT0) to primitive type(PT1)
     for (size_t i = 0; i < _num_of_columns_from_file; ++i) {
         SlotDescriptor* slot_desc = _src_slot_descs[i];
@@ -194,7 +279,7 @@ Status VParquetScanner::_cast_src_block(Block* block) {
     return Status::OK();
 }
 
-Status VParquetScanner::_append_batch_to_src_block(Block* block) {
+Status VArrowScanner::_append_batch_to_src_block(Block* block) {
     size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
                                            (_batch->num_rows() - _arrow_batch_cur_idx));
     size_t column_pos = 0;
@@ -214,4 +299,12 @@ Status VParquetScanner::_append_batch_to_src_block(Block* block) {
     return Status::OK();
 }
 
-} // namespace doris::vectorized
+void VArrowScanner::close() {
+    BaseScanner::close();
+    if (_cur_file_reader != nullptr) {
+        delete _cur_file_reader;
+        _cur_file_reader = nullptr;
+    }
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/varrow_scanner.h
similarity index 53%
copy from be/src/vec/exec/vparquet_scanner.h
copy to be/src/vec/exec/varrow_scanner.h
index 72ac280989..c3740e0d3e 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -18,7 +18,8 @@
 #pragma once
 
 #include <arrow/array.h>
-#include <exec/parquet_scanner.h>
+#include <exec/arrow/arrow_reader.h>
+#include <exec/arrow/orc_reader.h>
 
 #include <map>
 #include <memory>
@@ -28,37 +29,52 @@
 #include <vector>
 
 #include "common/status.h"
-#include "gen_cpp/PlanNodes_types.h"
+#include "exec/base_scanner.h"
 #include "gen_cpp/Types_types.h"
 #include "runtime/mem_pool.h"
 #include "util/runtime_profile.h"
 
 namespace doris::vectorized {
 
-// VParquet scanner convert the data read from Parquet to doris's columns.
-class VParquetScanner : public ParquetScanner {
+// VArrow scanner convert the data read from orc|parquet to doris's columns.
+class VArrowScanner : public BaseScanner {
 public:
-    VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
-                    const TBrokerScanRangeParams& params,
-                    const std::vector<TBrokerRangeDesc>& ranges,
-                    const std::vector<TNetworkAddress>& broker_addresses,
-                    const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+    VArrowScanner(RuntimeState* state, RuntimeProfile* profile,
+                  const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges,
+                  const std::vector<TNetworkAddress>& broker_addresses,
+                  const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
 
-    ~VParquetScanner() override;
+    virtual ~VArrowScanner();
 
     // Open this scanner, will initialize information need to
-    Status open() override;
+    virtual Status open() override;
 
-    Status get_next(Block* block, bool* eof) override;
+    virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof,
+                            bool* fill_tuple) override {
+        return Status::NotSupported("Not Implemented get next");
+    }
+
+    virtual Status get_next(Block* block, bool* eof) override;
+
+    virtual void close() override;
+
+protected:
+    virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+                                               int32_t num_of_columns_from_file) = 0;
 
 private:
+    // Read next buffer from reader
+    Status _open_next_reader();
     Status _next_arrow_batch();
     Status _init_arrow_batch_if_necessary();
-    Status _init_src_block() override;
+    Status _init_src_block();
     Status _append_batch_to_src_block(Block* block);
     Status _cast_src_block(Block* block);
 
 private:
+    // Reader
+    ArrowReaderWrap* _cur_file_reader;
+    bool _cur_file_eof; // is read over?
     std::shared_ptr<arrow::RecordBatch> _batch;
     size_t _arrow_batch_cur_idx;
 };
diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp
new file mode 100644
index 0000000000..7521634183
--- /dev/null
+++ b/be/src/vec/exec/vorc_scanner.cpp
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vorc_scanner.h"
+
+#include <exec/arrow/orc_reader.h>
+
+namespace doris::vectorized {
+
+VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile,
+                         const TBrokerScanRangeParams& params,
+                         const std::vector<TBrokerRangeDesc>& ranges,
+                         const std::vector<TNetworkAddress>& broker_addresses,
+                         const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+                        counter) {}
+
+ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+                                                int32_t num_of_columns_from_file) {
+    return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vorc_scanner.h
similarity index 52%
copy from be/src/vec/exec/vparquet_scanner.h
copy to be/src/vec/exec/vorc_scanner.h
index 72ac280989..12510e9731 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vorc_scanner.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <arrow/array.h>
-#include <exec/parquet_scanner.h>
+#include <vec/exec/varrow_scanner.h>
 
 #include <map>
 #include <memory>
@@ -28,39 +28,26 @@
 #include <vector>
 
 #include "common/status.h"
-#include "gen_cpp/PlanNodes_types.h"
+#include "exec/base_scanner.h"
 #include "gen_cpp/Types_types.h"
 #include "runtime/mem_pool.h"
 #include "util/runtime_profile.h"
 
 namespace doris::vectorized {
 
-// VParquet scanner convert the data read from Parquet to doris's columns.
-class VParquetScanner : public ParquetScanner {
+// VOrc scanner convert the data read from Orc to doris's columns.
+class VORCScanner final : public VArrowScanner {
 public:
-    VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
-                    const TBrokerScanRangeParams& params,
-                    const std::vector<TBrokerRangeDesc>& ranges,
-                    const std::vector<TNetworkAddress>& broker_addresses,
-                    const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+    VORCScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
+                const std::vector<TBrokerRangeDesc>& ranges,
+                const std::vector<TNetworkAddress>& broker_addresses,
+                const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
 
-    ~VParquetScanner() override;
+    ~VORCScanner() override = default;
 
-    // Open this scanner, will initialize information need to
-    Status open() override;
-
-    Status get_next(Block* block, bool* eof) override;
-
-private:
-    Status _next_arrow_batch();
-    Status _init_arrow_batch_if_necessary();
-    Status _init_src_block() override;
-    Status _append_batch_to_src_block(Block* block);
-    Status _cast_src_block(Block* block);
-
-private:
-    std::shared_ptr<arrow::RecordBatch> _batch;
-    size_t _arrow_batch_cur_idx;
+protected:
+    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+                                       int32_t num_of_columns_from_file) override;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index 037bc15028..cb59ae60bc 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -17,13 +17,7 @@
 
 #include "vec/exec/vparquet_scanner.h"
 
-#include "exec/parquet_reader.h"
-#include "exprs/expr.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "vec/data_types/data_type_factory.hpp"
-#include "vec/functions/simple_function_factory.h"
-#include "vec/utils/arrow_column_to_doris_column.h"
+#include "exec/arrow/parquet_reader.h"
 
 namespace doris::vectorized {
 
@@ -33,185 +27,12 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
                                  const std::vector<TNetworkAddress>& broker_addresses,
                                  const std::vector<TExpr>& pre_filter_texprs,
                                  ScannerCounter* counter)
-        : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
-                         counter),
-          _batch(nullptr),
-          _arrow_batch_cur_idx(0) {}
+        : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
+                        counter) {}
 
-VParquetScanner::~VParquetScanner() = default;
-
-Status VParquetScanner::open() {
-    RETURN_IF_ERROR(ParquetScanner::open());
-    if (_ranges.empty()) {
-        return Status::OK();
-    }
-    return Status::OK();
-}
-
-// get next available arrow batch
-Status VParquetScanner::_next_arrow_batch() {
-    _arrow_batch_cur_idx = 0;
-    // first, init file reader
-    if (_cur_file_reader == nullptr || _cur_file_eof) {
-        RETURN_IF_ERROR(open_next_reader());
-        _cur_file_eof = false;
-    }
-    // second, loop until find available arrow batch or EOF
-    while (!_scanner_eof) {
-        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof));
-        if (_cur_file_eof) {
-            RETURN_IF_ERROR(open_next_reader());
-            _cur_file_eof = false;
-            continue;
-        }
-        if (_batch->num_rows() == 0) {
-            continue;
-        }
-        return Status::OK();
-    }
-    return Status::EndOfFile("EOF");
-}
-
-Status VParquetScanner::_init_arrow_batch_if_necessary() {
-    // 1. init batch if first time
-    // 2. reset reader if end of file
-    Status status;
-    if (_scanner_eof) {
-        return Status::EndOfFile("EOF");
-    }
-    if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
-        return _next_arrow_batch();
-    }
-    return status;
-}
-
-Status VParquetScanner::_init_src_block() {
-    size_t batch_pos = 0;
-    _src_block.clear();
-    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
-        SlotDescriptor* slot_desc = _src_slot_descs[i];
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        auto* array = _batch->column(batch_pos++).get();
-        // let src column be nullable for simplify converting
-        // TODO, support not nullable for exec efficiently
-        auto is_nullable = true;
-        DataTypePtr data_type =
-                DataTypeFactory::instance().create_data_type(array->type()->id(), is_nullable);
-        if (data_type == nullptr) {
-            return Status::NotSupported(
-                    fmt::format("Not support arrow type:{}", array->type()->name()));
-        }
-        MutableColumnPtr data_column = data_type->create_column();
-        _src_block.insert(
-                ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
-    }
-    return Status::OK();
-}
-
-Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
-    // overall of type converting:
-    // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
-    // primitive type(PT1) ==materialize_block==> dest primitive type
-
-    // first, we need to convert the arrow type to the corresponding internal type,
-    // such as arrow::INT16 to TYPE_SMALLINT(PT0).
-    // why need first step? we cannot convert the arrow type to type in src desc directly,
-    // it's too hard to achieve.
-
-    // second, convert PT0 to the type in src desc, such as TYPE_SMALLINT to TYPE_VARCHAR.(PT1)
-    // why need second step? the materialize step only accepts types specified in src desc.
-
-    // finally, through the materialized, convert to the type in dest desc, such as TYPE_DATETIME.
-    SCOPED_TIMER(_read_timer);
-    // init arrow batch
-    {
-        Status st = _init_arrow_batch_if_necessary();
-        if (!st.ok()) {
-            if (!st.is_end_of_file()) {
-                return st;
-            }
-            *eof = true;
-            return Status::OK();
-        }
-    }
-
-    RETURN_IF_ERROR(_init_src_block());
-    // convert arrow batch to block until reach the batch_size
-    while (!_scanner_eof) {
-        // cast arrow type to PT0 and append it to src block
-        // for example: arrow::Type::INT16 => TYPE_SMALLINT
-        RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block));
-        // finalize the src block if full
-        if (_src_block.rows() >= _state->batch_size()) {
-            break;
-        }
-        auto status = _next_arrow_batch();
-        // if ok, append the batch to the src columns
-        if (status.ok()) {
-            continue;
-        }
-        // return error if not EOF
-        if (!status.is_end_of_file()) {
-            return status;
-        }
-        _cur_file_eof = true;
-        break;
-    }
-    COUNTER_UPDATE(_rows_read_counter, _src_block.rows());
-    SCOPED_TIMER(_materialize_timer);
-    // cast PT0 => PT1
-    // for example: TYPE_SMALLINT => TYPE_VARCHAR
-    RETURN_IF_ERROR(_cast_src_block(&_src_block));
-
-    // materialize, src block => dest columns
-    return _fill_dest_block(block, eof);
-}
-
-// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
-// primitive type(PT1) ==materialize_block==> dest primitive type
-Status VParquetScanner::_cast_src_block(Block* block) {
-    // cast primitive type(PT0) to primitive type(PT1)
-    for (size_t i = 0; i < _num_of_columns_from_file; ++i) {
-        SlotDescriptor* slot_desc = _src_slot_descs[i];
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        auto& arg = block->get_by_name(slot_desc->col_name());
-        // remove nullable here, let the get_function decide whether nullable
-        auto return_type = slot_desc->get_data_type_ptr();
-        ColumnsWithTypeAndName arguments {
-                arg,
-                {DataTypeString().create_column_const(
-                         arg.column->size(), remove_nullable(return_type)->get_family_name()),
-                 std::make_shared<DataTypeString>(), ""}};
-        auto func_cast =
-                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
-        RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size()));
-        block->get_by_position(i).type = std::move(return_type);
-    }
-    return Status::OK();
-}
-
-Status VParquetScanner::_append_batch_to_src_block(Block* block) {
-    size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
-                                           (_batch->num_rows() - _arrow_batch_cur_idx));
-    size_t column_pos = 0;
-    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
-        SlotDescriptor* slot_desc = _src_slot_descs[i];
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        auto* array = _batch->column(column_pos++).get();
-        auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
-        RETURN_IF_ERROR(arrow_column_to_doris_column(array, _arrow_batch_cur_idx,
-                                                     column_with_type_and_name, num_elements,
-                                                     _state->timezone()));
-    }
-
-    _arrow_batch_cur_idx += num_elements;
-    return Status::OK();
+ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+                                                    int32_t num_of_columns_from_file) {
+    return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h
index 72ac280989..367e2e7472 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <arrow/array.h>
-#include <exec/parquet_scanner.h>
+#include <vec/exec/varrow_scanner.h>
 
 #include <map>
 #include <memory>
@@ -36,7 +36,7 @@
 namespace doris::vectorized {
 
 // VParquet scanner convert the data read from Parquet to doris's columns.
-class VParquetScanner : public ParquetScanner {
+class VParquetScanner final : public VArrowScanner {
 public:
     VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
                     const TBrokerScanRangeParams& params,
@@ -44,23 +44,11 @@ public:
                     const std::vector<TNetworkAddress>& broker_addresses,
                     const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
 
-    ~VParquetScanner() override;
+    ~VParquetScanner() override = default;
 
-    // Open this scanner, will initialize information need to
-    Status open() override;
-
-    Status get_next(Block* block, bool* eof) override;
-
-private:
-    Status _next_arrow_batch();
-    Status _init_arrow_batch_if_necessary();
-    Status _init_src_block() override;
-    Status _append_batch_to_src_block(Block* block);
-    Status _cast_src_block(Block* block);
-
-private:
-    std::shared_ptr<arrow::RecordBatch> _batch;
-    size_t _arrow_batch_cur_idx;
+protected:
+    ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
+                                       int32_t num_of_columns_from_file) override;
 };
 
 } // namespace doris::vectorized
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 20052e823e..48d37e56a1 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -338,6 +338,8 @@ set(VEC_TEST_FILES
     vec/exec/vbroker_scanner_test.cpp
     vec/exec/vjson_scanner_test.cpp
     vec/exec/vtablet_sink_test.cpp
+    vec/exec/vorc_scanner_test.cpp
+    vec/exec/vparquet_scanner_test.cpp
     vec/exprs/vexpr_test.cpp
     vec/function/function_array_element_test.cpp
     vec/function/function_array_index_test.cpp
diff --git a/be/test/olap/hll_test.cpp b/be/test/olap/hll_test.cpp
index 3131efdeb2..5843a076cb 100644
--- a/be/test/olap/hll_test.cpp
+++ b/be/test/olap/hll_test.cpp
@@ -34,7 +34,7 @@ static uint64_t hash(uint64_t value) {
 }
 //  keep logic same with java version in fe when you change hll_test.cpp,see HllTest.java
 TEST_F(TestHll, Normal) {
-    uint8_t buf[HLL_REGISTERS_COUNT + 1];
+    uint8_t buf[HLL_REGISTERS_COUNT + 1] = {0};
 
     // empty
     {
diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp
new file mode 100644
index 0000000000..f5f8bf522c
--- /dev/null
+++ b/be/test/vec/exec/vorc_scanner_test.cpp
@@ -0,0 +1,892 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vorc_scanner.h"
+
+#include <gtest/gtest.h>
+#include <runtime/descriptor_helper.h>
+#include <time.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/local_file_reader.h"
+#include "exec/orc_scanner.h"
+#include "exprs/cast_functions.h"
+#include "exprs/decimalv2_operators.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+namespace vectorized {
+
+class VOrcScannerTest : public testing::Test {
+public:
+    VOrcScannerTest() : _runtime_state(TQueryGlobals()) {
+        _profile = _runtime_state.runtime_profile();
+        _runtime_state._instance_mem_tracker.reset(new MemTracker());
+        _runtime_state._query_options.enable_vectorized_engine = true;
+    }
+    ~VOrcScannerTest() {}
+
+    static void SetUpTestCase() {
+        UserFunctionCache::instance()->init(
+                "./be/test/runtime/test_data/user_function_cache/normal");
+        CastFunctions::init();
+        DecimalV2Operators::init();
+    }
+
+protected:
+    virtual void SetUp() {}
+
+    virtual void TearDown() {}
+
+private:
+    RuntimeState _runtime_state;
+    RuntimeProfile* _profile;
+    ObjectPool _obj_pool;
+    DescriptorTbl* _desc_tbl;
+    std::vector<TNetworkAddress> _addresses;
+    ScannerCounter _counter;
+    std::vector<TExpr> _pre_filter;
+    bool _fill_tuple;
+};
+
+TEST_F(VOrcScannerTest, normal) {
+    TBrokerScanRangeParams params;
+    TTypeDesc varchar_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::VARCHAR);
+        scalar_type.__set_len(65535);
+        node.__set_scalar_type(scalar_type);
+        varchar_type.types.push_back(node);
+    }
+
+    TTypeDesc int_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::INT);
+        node.__set_scalar_type(scalar_type);
+        int_type.types.push_back(node);
+    }
+
+    TTypeDesc big_int_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::BIGINT);
+        node.__set_scalar_type(scalar_type);
+        big_int_type.types.push_back(node);
+    }
+
+    TTypeDesc float_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::FLOAT);
+        node.__set_scalar_type(scalar_type);
+        float_type.types.push_back(node);
+    }
+
+    TTypeDesc double_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::DOUBLE);
+        node.__set_scalar_type(scalar_type);
+        double_type.types.push_back(node);
+    }
+
+    TTypeDesc date_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::DATE);
+        node.__set_scalar_type(scalar_type);
+        date_type.types.push_back(node);
+    }
+
+    //col1 varchar -> bigint
+    {
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = big_int_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttobigint";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = big_int_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = 0;
+        slot_ref.slot_ref.tuple_id = 0;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        params.expr_of_dest_slot.emplace(8, expr);
+        params.src_slot_ids.push_back(0);
+    }
+    //col2, col3
+    for (int i = 1; i <= 2; i++) {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = i;
+        slot_ref.slot_ref.tuple_id = 0;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        params.expr_of_dest_slot.emplace(8 + i, expr);
+        params.src_slot_ids.push_back(i);
+    }
+
+    //col5 varchar -> double
+    {
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = double_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttodouble";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = double_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_double_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = 3;
+        slot_ref.slot_ref.tuple_id = 0;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        params.expr_of_dest_slot.emplace(11, expr);
+        params.src_slot_ids.push_back(3);
+    }
+
+    //col6 varchar -> float
+    {
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = float_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttofloat";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = float_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_float_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = 4;
+        slot_ref.slot_ref.tuple_id = 0;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        params.expr_of_dest_slot.emplace(12, expr);
+        params.src_slot_ids.push_back(4);
+    }
+    //col7,col8
+    for (int i = 5; i <= 6; i++) {
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = int_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttoint";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = int_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_int_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = i;
+        slot_ref.slot_ref.tuple_id = 0;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        params.expr_of_dest_slot.emplace(8 + i, expr);
+        params.src_slot_ids.push_back(i);
+    }
+
+    //col9 varchar -> var
+    {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = 7;
+        slot_ref.slot_ref.tuple_id = 0;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        params.expr_of_dest_slot.emplace(15, expr);
+        params.src_slot_ids.push_back(7);
+    }
+
+    params.__set_src_tuple_id(0);
+    params.__set_dest_tuple_id(1);
+
+    //init_desc_table
+    TDescriptorTable t_desc_table;
+
+    // table descriptors
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::BROKER_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    TDescriptorTableBuilder dtb;
+    TTupleDescriptorBuilder src_tuple_builder;
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col1")
+                                       .column_pos(1)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col2")
+                                       .column_pos(2)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col3")
+                                       .column_pos(3)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col5")
+                                       .column_pos(4)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col6")
+                                       .column_pos(5)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col7")
+                                       .column_pos(6)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col8")
+                                       .column_pos(7)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col9")
+                                       .column_pos(8)
+                                       .build());
+    src_tuple_builder.build(&dtb);
+
+    TTupleDescriptorBuilder dest_tuple_builder;
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("col1").column_pos(1).build());
+    dest_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                        .string_type(65535)
+                                        .nullable(true)
+                                        .column_name("col2")
+                                        .column_pos(2)
+                                        .build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().string_type(65535).column_name("col3").column_pos(3).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().type(TYPE_DOUBLE).column_name("col5").column_pos(4).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().type(TYPE_FLOAT).column_name("col6").column_pos(5).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().type(TYPE_INT).column_name("col7").column_pos(6).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().type(TYPE_INT).column_name("col8").column_pos(7).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().string_type(65535).column_name("col9").column_pos(8).build());
+    dest_tuple_builder.build(&dtb);
+    t_desc_table = dtb.desc_tbl();
+
+    DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+    _runtime_state.set_desc_tbl(_desc_tbl);
+
+    std::vector<TBrokerRangeDesc> ranges;
+    TBrokerRangeDesc rangeDesc;
+    rangeDesc.start_offset = 0;
+    rangeDesc.size = -1;
+    rangeDesc.format_type = TFileFormatType::FORMAT_ORC;
+    rangeDesc.splittable = false;
+
+    rangeDesc.path = "./be/test/exec/test_data/orc_scanner/my-file.orc";
+    rangeDesc.file_type = TFileType::FILE_LOCAL;
+    ranges.push_back(rangeDesc);
+
+    VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter,
+                        &_counter);
+    EXPECT_TRUE(scanner.open().ok());
+
+    //auto tracker = std::make_shared<MemTracker>();
+    //MemPool tuple_pool(tracker.get());
+
+    //Tuple* tuple = (Tuple*)tuple_pool.allocate(_desc_tbl->get_tuple_descriptor(1)->byte_size());
+    vectorized::Block block;
+    bool eof = false;
+
+    EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+    EXPECT_TRUE(eof);
+    EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+    EXPECT_TRUE(eof);
+    scanner.close();
+}
+
+TEST_F(VOrcScannerTest, normal2) {
+    TBrokerScanRangeParams params;
+    TTypeDesc varchar_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::VARCHAR);
+        scalar_type.__set_len(65535);
+        node.__set_scalar_type(scalar_type);
+        varchar_type.types.push_back(node);
+    }
+
+    TTypeDesc int_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::INT);
+        node.__set_scalar_type(scalar_type);
+        int_type.types.push_back(node);
+    }
+
+    {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = 1;
+        slot_ref.slot_ref.tuple_id = 0;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        params.expr_of_dest_slot.emplace(3, expr);
+        params.src_slot_ids.push_back(0);
+        params.src_slot_ids.push_back(1);
+        params.src_slot_ids.push_back(2);
+    }
+    params.__set_src_tuple_id(0);
+    params.__set_dest_tuple_id(1);
+
+    //init_desc_table
+    TDescriptorTable t_desc_table;
+
+    // table descriptors
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::BROKER_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    TDescriptorTableBuilder dtb;
+    TTupleDescriptorBuilder src_tuple_builder;
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col1")
+                                       .column_pos(1)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col2")
+                                       .column_pos(2)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col3")
+                                       .column_pos(3)
+                                       .build());
+    src_tuple_builder.build(&dtb);
+    TTupleDescriptorBuilder dest_tuple_builder;
+    dest_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                        .string_type(65535)
+                                        .column_name("value_from_col2")
+                                        .column_pos(1)
+                                        .build());
+
+    dest_tuple_builder.build(&dtb);
+    t_desc_table = dtb.desc_tbl();
+
+    DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+    _runtime_state.set_desc_tbl(_desc_tbl);
+
+    std::vector<TBrokerRangeDesc> ranges;
+    TBrokerRangeDesc rangeDesc;
+    rangeDesc.start_offset = 0;
+    rangeDesc.size = -1;
+    rangeDesc.format_type = TFileFormatType::FORMAT_ORC;
+    rangeDesc.splittable = false;
+
+    rangeDesc.path = "./be/test/exec/test_data/orc_scanner/my-file.orc";
+    rangeDesc.file_type = TFileType::FILE_LOCAL;
+    ranges.push_back(rangeDesc);
+
+    VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter,
+                        &_counter);
+    EXPECT_TRUE(scanner.open().ok());
+
+    bool eof = false;
+    vectorized::Block block;
+    EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+    EXPECT_EQ(10, block.rows());
+    EXPECT_TRUE(eof);
+    scanner.close();
+}
+
+TEST_F(VOrcScannerTest, normal3) {
+    TBrokerScanRangeParams params;
+    TTypeDesc varchar_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::VARCHAR);
+        scalar_type.__set_len(65535);
+        node.__set_scalar_type(scalar_type);
+        varchar_type.types.push_back(node);
+    }
+
+    TTypeDesc decimal_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::DECIMALV2);
+        scalar_type.__set_precision(64);
+        scalar_type.__set_scale(64);
+        node.__set_scalar_type(scalar_type);
+        decimal_type.types.push_back(node);
+    }
+
+    TTypeDesc tinyint_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::TINYINT);
+        node.__set_scalar_type(scalar_type);
+        tinyint_type.types.push_back(node);
+    }
+
+    TTypeDesc datetime_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::DATETIME);
+        node.__set_scalar_type(scalar_type);
+        datetime_type.types.push_back(node);
+    }
+
+    TTypeDesc date_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::DATE);
+        node.__set_scalar_type(scalar_type);
+        date_type.types.push_back(node);
+    }
+
+    {
+        for (int i = 0; i < 5; ++i) {
+            TExprNode cast_expr;
+            cast_expr.node_type = TExprNodeType::CAST_EXPR;
+            cast_expr.type = decimal_type;
+            cast_expr.__set_opcode(TExprOpcode::CAST);
+            cast_expr.__set_num_children(1);
+            cast_expr.__set_output_scale(-1);
+            cast_expr.__isset.fn = true;
+            cast_expr.fn.name.function_name = "casttodecimalv2";
+            cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+            cast_expr.fn.arg_types.push_back(varchar_type);
+            cast_expr.fn.ret_type = decimal_type;
+            cast_expr.fn.has_var_args = false;
+            cast_expr.fn.__set_signature("cast_to_decimalv2_val(VARCHAR(*))");
+            cast_expr.fn.__isset.scalar_fn = true;
+            cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";
+
+            TExprNode slot_ref;
+            slot_ref.node_type = TExprNodeType::SLOT_REF;
+            slot_ref.type = varchar_type;
+            slot_ref.num_children = 0;
+            slot_ref.__isset.slot_ref = true;
+            slot_ref.slot_ref.slot_id = i;
+            slot_ref.slot_ref.tuple_id = 0;
+
+            TExpr expr;
+            expr.nodes.push_back(cast_expr);
+            expr.nodes.push_back(slot_ref);
+
+            params.expr_of_dest_slot.emplace(9 + i, expr);
+            params.src_slot_ids.push_back(i);
+        }
+
+        {
+            TExprNode cast_expr;
+            cast_expr.node_type = TExprNodeType::CAST_EXPR;
+            cast_expr.type = tinyint_type;
+            cast_expr.__set_opcode(TExprOpcode::CAST);
+            cast_expr.__set_num_children(1);
+            cast_expr.__set_output_scale(-1);
+            cast_expr.__isset.fn = true;
+            cast_expr.fn.name.function_name = "casttotinyint";
+            cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+            cast_expr.fn.arg_types.push_back(varchar_type);
+            cast_expr.fn.ret_type = tinyint_type;
+            cast_expr.fn.has_var_args = false;
+            cast_expr.fn.__set_signature("cast_to_tiny_int_val(VARCHAR(*))");
+            cast_expr.fn.__isset.scalar_fn = true;
+            cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_tiny_int_val";
+
+            TExprNode slot_ref;
+            slot_ref.node_type = TExprNodeType::SLOT_REF;
+            slot_ref.type = varchar_type;
+            slot_ref.num_children = 0;
+            slot_ref.__isset.slot_ref = true;
+            slot_ref.slot_ref.slot_id = 5;
+            slot_ref.slot_ref.tuple_id = 0;
+
+            TExpr expr;
+            expr.nodes.push_back(cast_expr);
+            expr.nodes.push_back(slot_ref);
+
+            params.expr_of_dest_slot.emplace(14, expr);
+            params.src_slot_ids.push_back(5);
+        }
+
+        {
+            TExprNode cast_expr;
+            cast_expr.node_type = TExprNodeType::CAST_EXPR;
+            cast_expr.type = datetime_type;
+            cast_expr.__set_opcode(TExprOpcode::CAST);
+            cast_expr.__set_num_children(1);
+            cast_expr.__set_output_scale(-1);
+            cast_expr.__isset.fn = true;
+            cast_expr.fn.name.function_name = "casttodatetime";
+            cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+            cast_expr.fn.arg_types.push_back(varchar_type);
+            cast_expr.fn.ret_type = datetime_type;
+            cast_expr.fn.has_var_args = false;
+            cast_expr.fn.__set_signature("cast_to_datetime_val(VARCHAR(*))");
+            cast_expr.fn.__isset.scalar_fn = true;
+            cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_datetime_val";
+
+            TExprNode slot_ref;
+            slot_ref.node_type = TExprNodeType::SLOT_REF;
+            slot_ref.type = varchar_type;
+            slot_ref.num_children = 0;
+            slot_ref.__isset.slot_ref = true;
+            slot_ref.slot_ref.slot_id = 6;
+            slot_ref.slot_ref.tuple_id = 0;
+
+            TExpr expr;
+            expr.nodes.push_back(cast_expr);
+            expr.nodes.push_back(slot_ref);
+
+            params.expr_of_dest_slot.emplace(15, expr);
+            params.src_slot_ids.push_back(6);
+        }
+        {
+            TExprNode cast_expr;
+            cast_expr.node_type = TExprNodeType::CAST_EXPR;
+            cast_expr.type = date_type;
+            cast_expr.__set_opcode(TExprOpcode::CAST);
+            cast_expr.__set_num_children(1);
+            cast_expr.__set_output_scale(-1);
+            cast_expr.__isset.fn = true;
+            cast_expr.fn.name.function_name = "casttodate";
+            cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+            cast_expr.fn.arg_types.push_back(varchar_type);
+            cast_expr.fn.ret_type = date_type;
+            cast_expr.fn.has_var_args = false;
+            cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+            cast_expr.fn.__isset.scalar_fn = true;
+            cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_date_val";
+
+            TExprNode slot_ref;
+            slot_ref.node_type = TExprNodeType::SLOT_REF;
+            slot_ref.type = varchar_type;
+            slot_ref.num_children = 0;
+            slot_ref.__isset.slot_ref = true;
+            slot_ref.slot_ref.slot_id = 7;
+            slot_ref.slot_ref.tuple_id = 0;
+
+            TExpr expr;
+            expr.nodes.push_back(cast_expr);
+            expr.nodes.push_back(slot_ref);
+
+            params.expr_of_dest_slot.emplace(16, expr);
+            params.src_slot_ids.push_back(7);
+        }
+        {
+            TExprNode cast_expr;
+            cast_expr.node_type = TExprNodeType::CAST_EXPR;
+            cast_expr.type = decimal_type;
+            cast_expr.__set_opcode(TExprOpcode::CAST);
+            cast_expr.__set_num_children(1);
+            cast_expr.__set_output_scale(-1);
+            cast_expr.__isset.fn = true;
+            cast_expr.fn.name.function_name = "casttodecimalv2";
+            cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+            cast_expr.fn.arg_types.push_back(varchar_type);
+            cast_expr.fn.ret_type = decimal_type;
+            cast_expr.fn.has_var_args = false;
+            cast_expr.fn.__set_signature("cast_to_decimalv2_val(VARCHAR(*))");
+            cast_expr.fn.__isset.scalar_fn = true;
+            cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";
+
+            TExprNode slot_ref;
+            slot_ref.node_type = TExprNodeType::SLOT_REF;
+            slot_ref.type = varchar_type;
+            slot_ref.num_children = 0;
+            slot_ref.__isset.slot_ref = true;
+            slot_ref.slot_ref.slot_id = 8;
+            slot_ref.slot_ref.tuple_id = 0;
+
+            TExpr expr;
+            expr.nodes.push_back(cast_expr);
+            expr.nodes.push_back(slot_ref);
+
+            params.expr_of_dest_slot.emplace(17, expr);
+            params.src_slot_ids.push_back(8);
+        }
+    }
+    params.__set_src_tuple_id(0);
+    params.__set_dest_tuple_id(1);
+
+    //init_desc_table
+    TDescriptorTable t_desc_table;
+
+    // table descriptors
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::BROKER_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    TDescriptorTableBuilder dtb;
+    TTupleDescriptorBuilder src_tuple_builder;
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col1")
+                                       .column_pos(1)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col2")
+                                       .column_pos(2)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col3")
+                                       .column_pos(3)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col4")
+                                       .column_pos(4)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col5")
+                                       .column_pos(5)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col6")
+                                       .column_pos(6)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col7")
+                                       .column_pos(7)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col8")
+                                       .column_pos(8)
+                                       .build());
+    src_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .string_type(65535)
+                                       .nullable(true)
+                                       .column_name("col9")
+                                       .column_pos(9)
+                                       .build());
+    src_tuple_builder.build(&dtb);
+
+    TTupleDescriptorBuilder dest_tuple_builder;
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().decimal_type(10, 9).column_name("col1").column_pos(1).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().decimal_type(7, 5).column_name("col2").column_pos(2).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().decimal_type(10, 9).column_name("col3").column_pos(3).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().decimal_type(10, 5).column_name("col4").column_pos(4).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().decimal_type(10, 5).column_name("col5").column_pos(5).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("col6").column_pos(6).build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("col7").column_pos(7).build());
+    dest_tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                        .type(TYPE_DATE)
+                                        .nullable(true)
+                                        .column_name("col8")
+                                        .column_pos(8)
+                                        .build());
+    dest_tuple_builder.add_slot(
+            TSlotDescriptorBuilder().decimal_type(27, 9).column_name("col9").column_pos(9).build());
+
+    dest_tuple_builder.build(&dtb);
+    t_desc_table = dtb.desc_tbl();
+
+    DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+    _runtime_state.set_desc_tbl(_desc_tbl);
+
+    std::vector<TBrokerRangeDesc> ranges;
+    TBrokerRangeDesc rangeDesc;
+    rangeDesc.start_offset = 0;
+    rangeDesc.size = -1;
+    rangeDesc.format_type = TFileFormatType::FORMAT_ORC;
+    rangeDesc.splittable = false;
+
+    rangeDesc.path = "./be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc";
+    rangeDesc.file_type = TFileType::FILE_LOCAL;
+    ranges.push_back(rangeDesc);
+
+    VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter,
+                        &_counter);
+    EXPECT_TRUE(scanner.open().ok());
+
+    bool eof = false;
+    vectorized::Block block;
+    EXPECT_TRUE(scanner.get_next(&block, &eof).ok());
+    EXPECT_EQ(1, block.rows());
+    EXPECT_TRUE(eof);
+    scanner.close();
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp
new file mode 100644
index 0000000000..ba8f70ce70
--- /dev/null
+++ b/be/test/vec/exec/vparquet_scanner_test.cpp
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <time.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/local_file_reader.h"
+#include "exprs/cast_functions.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+namespace vectorized {
+
+class VParquetScannerTest : public testing::Test {
+public:
+    VParquetScannerTest() : _runtime_state(TQueryGlobals()) {
+        init();
+        _runtime_state._instance_mem_tracker.reset(new MemTracker());
+        _runtime_state._query_options.enable_vectorized_engine = true;
+    }
+    ~VParquetScannerTest() {}
+    void init();
+    static void SetUpTestCase() {
+        UserFunctionCache::instance()->init(
+                "./be/test/runtime/test_data/user_function_cache/normal");
+        CastFunctions::init();
+    }
+
+protected:
+    virtual void SetUp() {}
+    virtual void TearDown() {}
+
+private:
+    int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+    int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+    void create_expr_info();
+    void init_desc_table();
+    RuntimeState _runtime_state;
+    ObjectPool _obj_pool;
+    std::map<std::string, SlotDescriptor*> _slots_map;
+    TBrokerScanRangeParams _params;
+    DescriptorTbl* _desc_tbl;
+    TPlanNode _tnode;
+};
+
+#define TUPLE_ID_DST 0
+#define TUPLE_ID_SRC 1
+#define COLUMN_NUMBERS 20
+#define DST_TUPLE_SLOT_ID_START 1
+#define SRC_TUPLE_SLOT_ID_START 21
+int VParquetScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+    const char* columnNames[] = {
+            "log_version",       "log_time", "log_time_stamp", "js_version",
+            "vst_cookie",        "vst_ip",   "vst_user_id",    "vst_user_agent",
+            "device_resolution", "page_url", "page_refer_url", "page_yyid",
+            "page_type",         "pos_type", "content_id",     "media_id",
+            "spm_cnt",           "spm_pre",  "scm_cnt",        "partition_column"};
+    for (int i = 0; i < COLUMN_NUMBERS; i++) {
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 1;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::VARCHAR);
+            scalar_type.__set_len(65535);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = i;
+        // Skip the first 8 bytes These 8 bytes are used to indicate whether the field is a null value
+        slot_desc.byteOffset = i * 16 + 8;
+        slot_desc.nullIndicatorByte = i / 8;
+        slot_desc.nullIndicatorBit = i % 8;
+        slot_desc.colName = columnNames[i];
+        slot_desc.slotIdx = i + 1;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+
+    {
+        // TTupleDescriptor source
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = TUPLE_ID_SRC;
+        //Here 8 bytes in order to handle null values
+        t_tuple_desc.byteSize = COLUMN_NUMBERS * 16 + 8;
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+    return next_slot_id;
+}
+
+int VParquetScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+    int32_t byteOffset =
+            8; // Skip the first 8 bytes These 8 bytes are used to indicate whether the field is a null value
+    {          //log_version
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::VARCHAR); //parquet::Type::BYTE
+            scalar_type.__set_len(65535);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 0;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 0;
+        slot_desc.colName = "log_version";
+        slot_desc.slotIdx = 1;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 16;
+    { // log_time
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::BIGINT); //parquet::Type::INT64
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 1;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 1;
+        slot_desc.colName = "log_time";
+        slot_desc.slotIdx = 2;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 8;
+    { // log_time_stamp
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::BIGINT); //parquet::Type::INT32
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 2;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 2;
+        slot_desc.colName = "log_time_stamp";
+        slot_desc.slotIdx = 3;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 8;
+    const char* columnNames[] = {
+            "log_version",       "log_time", "log_time_stamp", "js_version",
+            "vst_cookie",        "vst_ip",   "vst_user_id",    "vst_user_agent",
+            "device_resolution", "page_url", "page_refer_url", "page_yyid",
+            "page_type",         "pos_type", "content_id",     "media_id",
+            "spm_cnt",           "spm_pre",  "scm_cnt",        "partition_column"};
+    for (int i = 3; i < COLUMN_NUMBERS; i++, byteOffset += 16) {
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::VARCHAR); //parquet::Type::BYTE
+            scalar_type.__set_len(65535);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = i;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = i / 8;
+        slot_desc.nullIndicatorBit = i % 8;
+        slot_desc.colName = columnNames[i];
+        slot_desc.slotIdx = i + 1;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+
+    t_desc_table.__isset.slotDescriptors = true;
+    {
+        // TTupleDescriptor dest
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = TUPLE_ID_DST;
+        t_tuple_desc.byteSize = byteOffset + 8; //Here 8 bytes in order to handle null values
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+    return next_slot_id;
+}
+
+void VParquetScannerTest::init_desc_table() {
+    TDescriptorTable t_desc_table;
+
+    // table descriptors
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::BROKER_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    int next_slot_id = 1;
+
+    next_slot_id = create_dst_tuple(t_desc_table, next_slot_id);
+
+    next_slot_id = create_src_tuple(t_desc_table, next_slot_id);
+
+    DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+
+    _runtime_state.set_desc_tbl(_desc_tbl);
+}
+
+void VParquetScannerTest::create_expr_info() {
+    TTypeDesc varchar_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::VARCHAR);
+        scalar_type.__set_len(5000);
+        node.__set_scalar_type(scalar_type);
+        varchar_type.types.push_back(node);
+    }
+    // log_version VARCHAR --> VARCHAR
+    {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; // log_time id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START);
+    }
+    // log_time VARCHAR --> BIGINT
+    {
+        TTypeDesc int_type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::BIGINT);
+            node.__set_scalar_type(scalar_type);
+            int_type.types.push_back(node);
+        }
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = int_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttoint";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = int_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; // log_time id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1);
+    }
+    // log_time_stamp VARCHAR --> BIGINT
+    {
+        TTypeDesc int_type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::BIGINT);
+            node.__set_scalar_type(scalar_type);
+            int_type.types.push_back(node);
+        }
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = int_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttoint";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = int_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttoint(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2;
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2);
+    }
+    // couldn't convert type
+    for (int i = 3; i < COLUMN_NUMBERS; i++) {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + i; // log_time id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + i, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + i);
+    }
+
+    // _params.__isset.expr_of_dest_slot = true;
+    _params.__set_dest_tuple_id(TUPLE_ID_DST);
+    _params.__set_src_tuple_id(TUPLE_ID_SRC);
+}
+
+void VParquetScannerTest::init() {
+    create_expr_info();
+    init_desc_table();
+
+    // Node Id
+    _tnode.node_id = 0;
+    _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
+    _tnode.num_children = 0;
+    _tnode.limit = -1;
+    _tnode.row_tuples.push_back(0);
+    _tnode.nullable_tuples.push_back(false);
+    _tnode.broker_scan_node.tuple_id = 0;
+    _tnode.__isset.broker_scan_node = true;
+}
+
+TEST_F(VParquetScannerTest, normal) {
+    VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+    scan_node.init(_tnode);
+    auto status = scan_node.prepare(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    // set scan range
+    std::vector<TScanRangeParams> scan_ranges;
+    {
+        TScanRangeParams scan_range_params;
+
+        TBrokerScanRange broker_scan_range;
+        broker_scan_range.params = _params;
+        TBrokerRangeDesc range;
+        range.start_offset = 0;
+        range.size = -1;
+        range.format_type = TFileFormatType::FORMAT_PARQUET;
+        range.splittable = true;
+
+        std::vector<std::string> columns_from_path {"value"};
+        range.__set_columns_from_path(columns_from_path);
+        range.__set_num_of_columns_from_file(19);
+#if 1
+        range.path = "./be/test/exec/test_data/parquet_scanner/localfile.parquet";
+        range.file_type = TFileType::FILE_LOCAL;
+#else
+        range.path = "hdfs://ip:8020/user/xxxx.parq";
+        range.file_type = TFileType::FILE_BROKER;
+        TNetworkAddress addr;
+        addr.__set_hostname("127.0.0.1");
+        addr.__set_port(8000);
+        broker_scan_range.broker_addresses.push_back(addr);
+#endif
+        broker_scan_range.ranges.push_back(range);
+        scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+        scan_ranges.push_back(scan_range_params);
+    }
+
+    scan_node.set_scan_ranges(scan_ranges);
+    status = scan_node.open(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    // Get block
+    vectorized::Block block;
+    bool eof = false;
+    for (int i = 0; i < 14; i++) {
+        status = scan_node.get_next(&_runtime_state, &block, &eof);
+        EXPECT_TRUE(status.ok());
+        EXPECT_EQ(2048, block.rows());
+        EXPECT_FALSE(eof);
+        block.clear();
+    }
+
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(1328, block.rows());
+    EXPECT_TRUE(eof);
+    block.clear();
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(0, block.rows());
+    EXPECT_TRUE(eof);
+
+    scan_node.close(&_runtime_state);
+    {
+        std::stringstream ss;
+        scan_node.runtime_profile()->pretty_print(&ss);
+        LOG(INFO) << ss.str();
+    }
+}
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index d29ae4bf8e..6b96aa271c 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -360,9 +360,12 @@ build_gtest() {
 # rapidjson
 build_rapidjson() {
     check_if_source_exist $RAPIDJSON_SOURCE
-
-    rm -rf $TP_INSTALL_DIR/rapidjson
-    cp -r $TP_SOURCE_DIR/$RAPIDJSON_SOURCE/include/rapidjson $TP_INCLUDE_DIR/
+    cd $TP_SOURCE_DIR/$RAPIDJSON_SOURCE
+    mkdir -p $BUILD_DIR && cd $BUILD_DIR
+    rm -rf CMakeCache.txt CMakeFiles/
+    ${CMAKE_CMD} ../ -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR -DRAPIDJSON_BUILD_DOC=OFF \
+    -DRAPIDJSON_BUILD_EXAMPLES=OFF -DRAPIDJSON_BUILD_TESTS=OFF
+    make -j $PARALLEL && make install
 }
 
 # snappy
@@ -373,7 +376,7 @@ build_snappy() {
     mkdir -p $BUILD_DIR && cd $BUILD_DIR
     rm -rf CMakeCache.txt CMakeFiles/
     CFLAGS="-O3" CXXFLAGS="-O3" ${CMAKE_CMD} -G "${GENERATOR}" -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR \
-    -DCMAKE_POSITION_INDEPENDENT_CODE=On \
+    -DCMAKE_POSITION_INDEPENDENT_CODE=ON \
     -DCMAKE_INSTALL_INCLUDEDIR=$TP_INCLUDE_DIR/snappy \
     -DSNAPPY_BUILD_TESTS=0 ../
     ${BUILD_SYSTEM} -j $PARALLEL && ${BUILD_SYSTEM} install
@@ -643,20 +646,23 @@ build_arrow() {
     export ARROW_SNAPPY_URL=${TP_SOURCE_DIR}/${SNAPPY_NAME}
     export ARROW_ZLIB_URL=${TP_SOURCE_DIR}/${ZLIB_NAME}
     export ARROW_XSIMD_URL=${TP_SOURCE_DIR}/${XSIMD_NAME}
+    export ARROW_ORC_URL=${TP_SOURCE_DIR}/${ORC_NAME}
 
     LDFLAGS="-L${TP_LIB_DIR} -static-libstdc++ -static-libgcc" \
     ${CMAKE_CMD} -G "${GENERATOR}" -DARROW_PARQUET=ON -DARROW_IPC=ON -DARROW_BUILD_SHARED=OFF \
     -DARROW_BUILD_STATIC=ON -DARROW_WITH_BROTLI=ON -DARROW_WITH_LZ4=ON -DARROW_USE_GLOG=ON \
     -DARROW_WITH_SNAPPY=ON -DARROW_WITH_ZLIB=ON -DARROW_WITH_ZSTD=ON -DARROW_JSON=ON \
-    -DARROW_WITH_UTF8PROC=OFF -DARROW_WITH_RE2=OFF \
+    -DARROW_WITH_UTF8PROC=OFF -DARROW_WITH_RE2=ON -DARROW_ORC=ON \
     -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR \
     -DCMAKE_INSTALL_LIBDIR=lib64 \
     -DARROW_BOOST_USE_SHARED=OFF \
     -DARROW_GFLAGS_USE_SHARED=OFF \
     -Dgflags_ROOT=$TP_INSTALL_DIR \
     -DGLOG_ROOT=$TP_INSTALL_DIR \
+    -DRE2_ROOT=$TP_INSTALL_DIR \
     -DZLIB_LIBRARY=$TP_INSTALL_DIR/lib/libz.a -DZLIB_INCLUDE_DIR=$TP_INSTALL_DIR/include \
     -DRapidJSON_ROOT=$TP_INSTALL_DIR \
+    -DORC_ROOT=$TP_INSTALL_DIR \
     -DBrotli_SOURCE=BUNDLED \
     -DLZ4_LIB=$TP_INSTALL_DIR/lib/liblz4.a -DLZ4_INCLUDE_DIR=$TP_INSTALL_DIR/include/lz4 \
     -DLz4_SOURCE=SYSTEM \
@@ -664,7 +670,6 @@ build_arrow() {
     -Dzstd_SOURCE=SYSTEM \
     -DSnappy_LIB=$TP_INSTALL_DIR/lib/libsnappy.a -DSnappy_INCLUDE_DIR=$TP_INSTALL_DIR/include \
     -DSnappy_SOURCE=SYSTEM \
-    -DBoost_INCLUDE_DIR=$TP_INSTALL_DIR/include \
     -DThrift_ROOT=$TP_INSTALL_DIR ..
 
     ${BUILD_SYSTEM} -j $PARALLEL && ${BUILD_SYSTEM} install
@@ -1011,6 +1016,7 @@ build_rocksdb
 build_cyrus_sasl
 build_librdkafka
 build_flatbuffers
+build_orc
 build_arrow
 build_s2
 build_bitshuffle
@@ -1019,7 +1025,6 @@ build_fmt
 build_parallel_hashmap
 build_pdqsort
 build_libdivide
-build_orc
 build_cctz
 build_tsan_header
 build_mysql


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org