You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/14 01:50:13 UTC

[incubator-doris] branch master updated: [feature] add vectorized vjson_scanner (#9311)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b817efd652 [feature] add vectorized vjson_scanner (#9311)
b817efd652 is described below

commit b817efd6527163bc637d3ae1ced82d9015b69f91
Author: carlvinhust2012 <hu...@126.com>
AuthorDate: Sat May 14 09:50:05 2022 +0800

    [feature] add vectorized vjson_scanner (#9311)
    
    This pr is used to add the vectorized vjson_scanner, which can support vectorized json import in stream load flow.
---
 be/src/exec/base_scanner.cpp            |  53 ++-
 be/src/exec/base_scanner.h              |   8 +
 be/src/exec/broker_scan_node.cpp        |  13 +-
 be/src/exec/json_scanner.cpp            |  61 ++-
 be/src/exec/json_scanner.h              |  13 +-
 be/src/vec/CMakeLists.txt               |   1 +
 be/src/vec/exec/vjson_scanner.cpp       | 522 ++++++++++++++++++++++
 be/src/vec/exec/vjson_scanner.h         | 120 +++++
 be/test/CMakeLists.txt                  |   1 +
 be/test/vec/exec/vjson_scanner_test.cpp | 767 ++++++++++++++++++++++++++++++++
 10 files changed, 1529 insertions(+), 30 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 8621cf75f8..c4e1b5c056 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -156,7 +156,6 @@ Status BaseScanner::init_expr_ctxes() {
             RETURN_IF_ERROR(ctx->open(_state));
             _dest_expr_ctx.emplace_back(ctx);
         }
-
         if (has_slot_id_map) {
             auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
             if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
@@ -279,6 +278,58 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
     return Status::OK();
 }
 
+Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
+    // filter block
+    if (!_vpre_filter_ctxs.empty()) {
+        for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
+            auto old_rows = temp_block->rows();
+            RETURN_IF_ERROR(
+                    vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
+            _counter->num_rows_unselected += old_rows - temp_block->rows();
+        }
+    }
+    return Status::OK();
+}
+
+Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
+    // Do vectorized expr here
+    Status status;
+    if (!_dest_vexpr_ctx.empty()) {
+        *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
+                _dest_vexpr_ctx, *temp_block, status);
+        if (UNLIKELY(output_block->rows() == 0)) {
+            return status;
+        }
+    }
+
+    return Status::OK();
+}
+
+Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
+                                    std::vector<vectorized::MutableColumnPtr>& columns) {
+    if (columns.empty() || columns[0]->size() == 0) {
+        return Status::OK();
+    }
+
+    std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
+    auto n_columns = 0;
+    for (const auto slot_desc : _src_slot_descs) {
+        temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
+                                                             slot_desc->get_data_type_ptr(),
+                                                             slot_desc->col_name()));
+    }
+
+    RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size()));
+
+    if (_dest_vexpr_ctx.empty()) {
+        *dest_block = *temp_block;
+    } else {
+        RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
+    }
+
+    return Status::OK();
+}
+
 void BaseScanner::fill_slots_of_columns_from_path(
         int start, const std::vector<std::string>& columns_from_path) {
     // values of columns from path can not be null
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 9c4179874e..02c2f56880 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -22,6 +22,7 @@
 #include "runtime/tuple.h"
 #include "util/runtime_profile.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 
@@ -75,11 +76,18 @@ public:
     virtual void close() = 0;
     Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
 
+    Status fill_dest_block(vectorized::Block* dest_block,
+                           std::vector<vectorized::MutableColumnPtr>& columns);
+
     void fill_slots_of_columns_from_path(int start,
                                          const std::vector<std::string>& columns_from_path);
 
     void free_expr_local_allocations();
 
+    Status filter_block(vectorized::Block* temp_block, size_t slot_num);
+
+    Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block);
+
 protected:
     RuntimeState* _state;
     const TBrokerScanRangeParams& _params;
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 513e653fb7..c1144495c2 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -32,6 +32,7 @@
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "vec/exec/vbroker_scanner.h"
+#include "vec/exec/vjson_scanner.h"
 
 namespace doris {
 
@@ -234,9 +235,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
                               counter);
         break;
     case TFileFormatType::FORMAT_JSON:
-        scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params,
-                               scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs,
-                               counter);
+        if (_vectorized) {
+            scan = new vectorized::VJsonScanner(
+                    _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges,
+                    scan_range.broker_addresses, _pre_filter_texprs, counter);
+        } else {
+            scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params,
+                                   scan_range.ranges, scan_range.broker_addresses,
+                                   _pre_filter_texprs, counter);
+        }
         break;
     default:
         if (_vectorized) {
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 5e002ac44e..a23ce44b03 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -112,14 +112,17 @@ Status JsonScanner::open_next_reader() {
         _scanner_eof = true;
         return Status::OK();
     }
+    RETURN_IF_ERROR(open_based_reader());
+    RETURN_IF_ERROR(open_json_reader());
+    _next_range++;
+    return Status::OK();
+}
 
+Status JsonScanner::open_based_reader() {
     RETURN_IF_ERROR(open_file_reader());
     if (_read_json_by_line) {
         RETURN_IF_ERROR(open_line_reader());
     }
-    RETURN_IF_ERROR(open_json_reader());
-    _next_range++;
-
     return Status::OK();
 }
 
@@ -215,6 +218,25 @@ Status JsonScanner::open_json_reader() {
     bool num_as_string = false;
     bool fuzzy_parse = false;
 
+    RETURN_IF_ERROR(
+            get_range_params(jsonpath, json_root, strip_outer_array, num_as_string, fuzzy_parse));
+    if (_read_json_by_line) {
+        _cur_json_reader =
+                new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
+                               fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader);
+    } else {
+        _cur_json_reader =
+                new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
+                               fuzzy_parse, &_scanner_eof, _cur_file_reader);
+    }
+
+    RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
+    return Status::OK();
+}
+
+Status JsonScanner::get_range_params(std::string& jsonpath, std::string& json_root,
+                                     bool& strip_outer_array, bool& num_as_string,
+                                     bool& fuzzy_parse) {
     const TBrokerRangeDesc& range = _ranges[_next_range];
 
     if (range.__isset.jsonpaths) {
@@ -232,17 +254,6 @@ Status JsonScanner::open_json_reader() {
     if (range.__isset.fuzzy_parse) {
         fuzzy_parse = range.fuzzy_parse;
     }
-    if (_read_json_by_line) {
-        _cur_json_reader =
-                new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
-                               fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader);
-    } else {
-        _cur_json_reader =
-                new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
-                               fuzzy_parse, &_scanner_eof, _cur_file_reader);
-    }
-
-    RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
     return Status::OK();
 }
 
@@ -308,14 +319,8 @@ JsonReader::~JsonReader() {
 }
 
 Status JsonReader::init(const std::string& jsonpath, const std::string& json_root) {
-    // parse jsonpath
-    if (!jsonpath.empty()) {
-        Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths);
-        RETURN_IF_ERROR(st);
-    }
-    if (!json_root.empty()) {
-        JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
-    }
+    // generate _parsed_jsonpaths and _parsed_json_root
+    RETURN_IF_ERROR(_parse_jsonpath_and_json_root(jsonpath, json_root));
 
     //improve performance
     if (_parsed_jsonpaths.empty()) { // input is a simple json-string
@@ -330,6 +335,18 @@ Status JsonReader::init(const std::string& jsonpath, const std::string& json_roo
     return Status::OK();
 }
 
+Status JsonReader::_parse_jsonpath_and_json_root(const std::string& jsonpath,
+                                                 const std::string& json_root) {
+    // parse jsonpath
+    if (!jsonpath.empty()) {
+        RETURN_IF_ERROR(_generate_json_paths(jsonpath, &_parsed_jsonpaths));
+    }
+    if (!json_root.empty()) {
+        JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
+    }
+    return Status::OK();
+}
+
 Status JsonReader::_generate_json_paths(const std::string& jsonpath,
                                         std::vector<std::vector<JsonPath>>* vect) {
     rapidjson::Document jsonpaths_doc;
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index b12c96f396..276b2dd077 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -67,13 +67,17 @@ public:
     // Close this scanner
     void close() override;
 
-private:
+protected:
     Status open_file_reader();
     Status open_line_reader();
     Status open_json_reader();
     Status open_next_reader();
 
-private:
+    Status open_based_reader();
+    Status get_range_params(std::string& jsonpath, std::string& json_root, bool& strip_outer_array,
+                            bool& num_as_string, bool& fuzzy_parse);
+
+protected:
     const std::vector<TBrokerRangeDesc>& _ranges;
     const std::vector<TNetworkAddress>& _broker_addresses;
 
@@ -129,7 +133,7 @@ public:
     Status read_json_row(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs,
                          MemPool* tuple_pool, bool* is_empty_row, bool* eof);
 
-private:
+protected:
     Status (JsonReader::*_handle_json_callback)(Tuple* tuple,
                                                 const std::vector<SlotDescriptor*>& slot_descs,
                                                 MemPool* tuple_pool, bool* is_empty_row, bool* eof);
@@ -158,8 +162,9 @@ private:
     void _close();
     Status _generate_json_paths(const std::string& jsonpath,
                                 std::vector<std::vector<JsonPath>>* vect);
+    Status _parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root);
 
-private:
+protected:
     int _next_line;
     int _total_lines;
     RuntimeState* _state;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 7555e9d0ca..22fa489f46 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -101,6 +101,7 @@ set(VEC_FILES
   exec/vtable_function_node.cpp
   exec/vbroker_scan_node.cpp
   exec/vbroker_scanner.cpp
+  exec/vjson_scanner.cpp
   exec/join/vhash_join_node.cpp
   exprs/vectorized_agg_fn.cpp
   exprs/vectorized_fn_call.cpp
diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp
new file mode 100644
index 0000000000..b46d16e80e
--- /dev/null
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vjson_scanner.h"
+
+#include <fmt/format.h>
+
+#include <algorithm>
+
+#include "env/env.h"
+#include "exec/broker_reader.h"
+#include "exec/buffered_reader.h"
+#include "exec/local_file_reader.h"
+#include "exec/plain_text_line_reader.h"
+#include "exec/s3_reader.h"
+#include "exprs/expr.h"
+#include "exprs/json_functions.h"
+#include "gutil/strings/split.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/time.h"
+
+namespace doris::vectorized {
+
+VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile,
+                           const TBrokerScanRangeParams& params,
+                           const std::vector<TBrokerRangeDesc>& ranges,
+                           const std::vector<TNetworkAddress>& broker_addresses,
+                           const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+          _cur_vjson_reader(nullptr) {}
+
+VJsonScanner::~VJsonScanner() {}
+
+Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) {
+    SCOPED_TIMER(_read_timer);
+    const int batch_size = _state->batch_size();
+    size_t slot_num = _src_slot_descs.size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_num);
+    auto string_type = make_nullable(std::make_shared<DataTypeString>());
+    for (int i = 0; i < slot_num; i++) {
+        columns[i] = string_type->create_column();
+    }
+
+    // Get one line
+    while (columns[0]->size() < batch_size && !_scanner_eof) {
+        if (_cur_file_reader == nullptr || _cur_reader_eof) {
+            RETURN_IF_ERROR(open_next_reader());
+            // If there isn't any more reader, break this
+            if (_scanner_eof) {
+                break;
+            }
+        }
+
+        if (_read_json_by_line && _skip_next_line) {
+            size_t size = 0;
+            const uint8_t* line_ptr = nullptr;
+            RETURN_IF_ERROR(_cur_line_reader->read_line(&line_ptr, &size, &_cur_reader_eof));
+            _skip_next_line = false;
+            continue;
+        }
+
+        bool is_empty_row = false;
+        RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(columns, _src_slot_descs, &is_empty_row,
+                                                            &_cur_reader_eof));
+        if (is_empty_row) {
+            // Read empty row, just continue
+            continue;
+        }
+    }
+
+    COUNTER_UPDATE(_rows_read_counter, columns[0]->size());
+    SCOPED_TIMER(_materialize_timer);
+    RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns));
+
+    *eof = _scanner_eof;
+    return Status::OK();
+}
+
+Status VJsonScanner::open_next_reader() {
+    if (_next_range >= _ranges.size()) {
+        _scanner_eof = true;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(JsonScanner::open_based_reader());
+    RETURN_IF_ERROR(open_vjson_reader());
+    _next_range++;
+    return Status::OK();
+}
+
+Status VJsonScanner::open_vjson_reader() {
+    if (_cur_vjson_reader != nullptr) {
+        _cur_vjson_reader.reset();
+    }
+    std::string json_root = "";
+    std::string jsonpath = "";
+    bool strip_outer_array = false;
+    bool num_as_string = false;
+    bool fuzzy_parse = false;
+
+    RETURN_IF_ERROR(JsonScanner::get_range_params(jsonpath, json_root, strip_outer_array,
+                                                  num_as_string, fuzzy_parse));
+    if (_read_json_by_line) {
+        _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array,
+                                                num_as_string, fuzzy_parse, &_scanner_eof, nullptr,
+                                                _cur_line_reader));
+    } else {
+        _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array,
+                                                num_as_string, fuzzy_parse, &_scanner_eof,
+                                                _cur_file_reader));
+    }
+
+    RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root));
+    return Status::OK();
+}
+
+VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
+                         bool strip_outer_array, bool num_as_string, bool fuzzy_parse,
+                         bool* scanner_eof, FileReader* file_reader, LineReader* line_reader)
+        : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse,
+                     scanner_eof, file_reader, line_reader),
+          _vhandle_json_callback(nullptr) {}
+
+VJsonReader::~VJsonReader() {}
+
+Status VJsonReader::init(const std::string& jsonpath, const std::string& json_root) {
+    // generate _parsed_jsonpaths and _parsed_json_root
+    RETURN_IF_ERROR(JsonReader::_parse_jsonpath_and_json_root(jsonpath, json_root));
+
+    //improve performance
+    if (_parsed_jsonpaths.empty()) { // input is a simple json-string
+        _vhandle_json_callback = &VJsonReader::_vhandle_simple_json;
+    } else { // input is a complex json-string and a json-path
+        if (_strip_outer_array) {
+            _vhandle_json_callback = &VJsonReader::_vhandle_flat_array_complex_json;
+        } else {
+            _vhandle_json_callback = &VJsonReader::_vhandle_nested_complex_json;
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VJsonReader::read_json_column(std::vector<MutableColumnPtr>& columns,
+                                     const std::vector<SlotDescriptor*>& slot_descs,
+                                     bool* is_empty_row, bool* eof) {
+    return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof);
+}
+
+Status VJsonReader::_vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
+                                         const std::vector<SlotDescriptor*>& slot_descs,
+                                         bool* is_empty_row, bool* eof) {
+    do {
+        bool valid = false;
+        if (_next_line >= _total_lines) { // parse json and generic document
+            Status st = _parse_json(is_empty_row, eof);
+            if (st.is_data_quality_error()) {
+                continue; // continue to read next
+            }
+            RETURN_IF_ERROR(st);
+            if (*is_empty_row == true) {
+                return Status::OK();
+            }
+            _name_map.clear();
+            rapidjson::Value* objectValue = nullptr;
+            if (_json_doc->IsArray()) {
+                _total_lines = _json_doc->Size();
+                if (_total_lines == 0) {
+                    // may be passing an empty json, such as "[]"
+                    RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json line", "", nullptr));
+                    if (*_scanner_eof) {
+                        *is_empty_row = true;
+                        return Status::OK();
+                    }
+                    continue;
+                }
+                objectValue = &(*_json_doc)[0];
+            } else {
+                _total_lines = 1; // only one row
+                objectValue = _json_doc;
+            }
+            _next_line = 0;
+            if (_fuzzy_parse) {
+                for (auto v : slot_descs) {
+                    for (int i = 0; i < objectValue->MemberCount(); ++i) {
+                        auto it = objectValue->MemberBegin() + i;
+                        if (v->col_name() == it->name.GetString()) {
+                            _name_map[v->col_name()] = i;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        if (_json_doc->IsArray()) {                                   // handle case 1
+            rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object
+            RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid));
+        } else { // handle case 2
+            RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, &valid));
+        }
+        _next_line++;
+        if (!valid) {
+            if (*_scanner_eof) {
+                // When _scanner_eof is true and valid is false, it means that we have encountered
+                // unqualified data and decided to stop the scan.
+                *is_empty_row = true;
+                return Status::OK();
+            }
+            continue;
+        }
+        *is_empty_row = false;
+        break; // get a valid row, then break
+    } while (_next_line <= _total_lines);
+    return Status::OK();
+}
+
+// for simple format json
+// set valid to true and return OK if succeed.
+// set valid to false and return OK if we met an invalid row.
+// return other status if encounter other problmes.
+Status VJsonReader::_set_column_value(rapidjson::Value& objectValue,
+                                      std::vector<MutableColumnPtr>& columns,
+                                      const std::vector<SlotDescriptor*>& slot_descs, bool* valid) {
+    if (!objectValue.IsObject()) {
+        // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"},
+        // not other type of Json format.
+        RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object value", "", valid));
+        return Status::OK();
+    }
+
+    int nullcount = 0;
+    int ctx_idx = 0;
+    for (auto slot_desc : slot_descs) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+
+        int dest_index = ctx_idx++;
+        auto* column_ptr = columns[dest_index].get();
+        rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd();
+
+        if (_fuzzy_parse) {
+            auto idx_it = _name_map.find(slot_desc->col_name());
+            if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) {
+                it = objectValue.MemberBegin() + idx_it->second;
+            }
+        } else {
+            it = objectValue.FindMember(
+                    rapidjson::Value(slot_desc->col_name().c_str(), slot_desc->col_name().size()));
+        }
+
+        if (it != objectValue.MemberEnd()) {
+            const rapidjson::Value& value = it->value;
+            RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, column_ptr, valid));
+            if (!(*valid)) {
+                return Status::OK();
+            }
+        } else { // not found
+            if (slot_desc->is_nullable()) {
+                auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+                nullable_column->insert_default();
+                nullcount++;
+            } else {
+                RETURN_IF_ERROR(_append_error_msg(
+                        objectValue,
+                        "The column `{}` is not nullable, but it's not found in jsondata.",
+                        slot_desc->col_name(), valid));
+                break;
+            }
+        }
+    }
+
+    if (nullcount == slot_descs.size()) {
+        RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, this is a invalid row.",
+                                          "", valid));
+        return Status::OK();
+    }
+    *valid = true;
+    return Status::OK();
+}
+
+Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value,
+                                          SlotDescriptor* slot_desc,
+                                          vectorized::IColumn* column_ptr, bool* valid) {
+    const char* str_value = nullptr;
+    char tmp_buf[128] = {0};
+    int32_t wbytes = 0;
+
+    if (slot_desc->is_nullable()) {
+        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+        nullable_column->get_null_map_data().push_back(0);
+        column_ptr = &nullable_column->get_nested_column();
+    }
+
+    switch (value->GetType()) {
+    case rapidjson::Type::kStringType:
+        str_value = value->GetString();
+        wbytes = strlen(str_value);
+        break;
+    case rapidjson::Type::kNumberType:
+        if (value->IsUint()) {
+            wbytes = sprintf(tmp_buf, "%u", value->GetUint());
+        } else if (value->IsInt()) {
+            wbytes = sprintf(tmp_buf, "%d", value->GetInt());
+        } else if (value->IsUint64()) {
+            wbytes = sprintf(tmp_buf, "%lu", value->GetUint64());
+        } else if (value->IsInt64()) {
+            wbytes = sprintf(tmp_buf, "%ld", value->GetInt64());
+        } else {
+            wbytes = sprintf(tmp_buf, "%f", value->GetDouble());
+        }
+        str_value = tmp_buf;
+        break;
+    case rapidjson::Type::kFalseType:
+        wbytes = 1;
+        str_value = (char*)"0";
+        break;
+    case rapidjson::Type::kTrueType:
+        wbytes = 1;
+        str_value = (char*)"1";
+        break;
+    case rapidjson::Type::kNullType:
+        if (slot_desc->is_nullable()) {
+            auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+            nullable_column->insert_default();
+        } else {
+            RETURN_IF_ERROR(_append_error_msg(
+                    *value, "Json value is null, but the column `{}` is not nullable.",
+                    slot_desc->col_name(), valid));
+            return Status::OK();
+        }
+        break;
+    default:
+        // for other type like array or object. we convert it to string to save
+        std::string json_str = JsonReader::_print_json_value(*value);
+        wbytes = json_str.size();
+        str_value = json_str.c_str();
+        break;
+    }
+
+    // TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR',
+    // we need use a function to support these types to insert data in columns.
+    DCHECK(slot_desc->type().type == TYPE_VARCHAR);
+    assert_cast<ColumnString*>(column_ptr)->insert_data(str_value, wbytes);
+
+    *valid = true;
+    return Status::OK();
+}
+
+Status VJsonReader::_vhandle_flat_array_complex_json(std::vector<MutableColumnPtr>& columns,
+                                                     const std::vector<SlotDescriptor*>& slot_descs,
+                                                     bool* is_empty_row, bool* eof) {
+    do {
+        if (_next_line >= _total_lines) {
+            Status st = _parse_json(is_empty_row, eof);
+            if (st.is_data_quality_error()) {
+                continue; // continue to read next
+            }
+            RETURN_IF_ERROR(st);
+            if (*is_empty_row == true) {
+                if (st == Status::OK()) {
+                    return Status::OK();
+                }
+                if (_total_lines == 0) {
+                    continue;
+                }
+            }
+        }
+        rapidjson::Value& objectValue = (*_json_doc)[_next_line++];
+        bool valid = true;
+        RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, columns, &valid));
+        if (!valid) {
+            continue; // process next line
+        }
+        *is_empty_row = false;
+        break; // get a valid row, then break
+    } while (_next_line <= _total_lines);
+    return Status::OK();
+}
+
+Status VJsonReader::_vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
+                                                 const std::vector<SlotDescriptor*>& slot_descs,
+                                                 bool* is_empty_row, bool* eof) {
+    while (true) {
+        Status st = _parse_json(is_empty_row, eof);
+        if (st.is_data_quality_error()) {
+            continue; // continue to read next
+        }
+        RETURN_IF_ERROR(st);
+        if (*is_empty_row == true) {
+            return Status::OK();
+        }
+        *is_empty_row = false;
+        break; // read a valid row
+    }
+    bool valid = true;
+    RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid));
+    if (!valid) {
+        // there is only one line in this case, so if it return false, just set is_empty_row true
+        // so that the caller will continue reading next line.
+        *is_empty_row = true;
+    }
+    return Status::OK();
+}
+
+Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
+                                               const std::vector<SlotDescriptor*>& slot_descs,
+                                               std::vector<MutableColumnPtr>& columns,
+                                               bool* valid) {
+    int nullcount = 0;
+    int ctx_idx = 0;
+    size_t column_num = slot_descs.size();
+    for (size_t i = 0; i < column_num; i++) {
+        int dest_index = ctx_idx++;
+        auto* column_ptr = columns[dest_index].get();
+        rapidjson::Value* json_values = nullptr;
+        bool wrap_explicitly = false;
+        if (LIKELY(i < _parsed_jsonpaths.size())) {
+            json_values = JsonFunctions::get_json_array_from_parsed_json(
+                    _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(),
+                    &wrap_explicitly);
+        }
+
+        if (json_values == nullptr) {
+            // not match in jsondata.
+            if (slot_descs[i]->is_nullable()) {
+                auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+                nullable_column->insert_default();
+                nullcount++;
+            } else {
+                RETURN_IF_ERROR(_append_error_msg(
+                        objectValue,
+                        "The column `{}` is not nullable, but it's not found in jsondata.",
+                        slot_descs[i]->col_name(), valid));
+                break;
+            }
+        } else {
+            CHECK(json_values->IsArray());
+            if (json_values->Size() == 1 && wrap_explicitly) {
+                // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array.
+                // so here we unwrap the array to get the real element.
+                // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap.
+                json_values = &((*json_values)[0]);
+            }
+            RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid));
+            if (!(*valid)) {
+                break;
+            }
+        }
+    }
+
+    if (nullcount == column_num) {
+        RETURN_IF_ERROR(_append_error_msg(
+                objectValue, "All fields is null or not matched, this is a invalid row.", "",
+                valid));
+    }
+    return Status::OK();
+}
+
+Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) {
+    size_t size = 0;
+    Status st = JsonReader::_parse_json_doc(&size, eof);
+    // terminate if encounter other errors
+    RETURN_IF_ERROR(st);
+
+    // read all data, then return
+    if (size == 0 || *eof) {
+        *is_empty_row = true;
+        return Status::OK();
+    }
+
+    if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+        _total_lines = _json_doc->Size();
+        _next_line = 0;
+
+        if (_total_lines == 0) {
+            // meet an empty json array.
+            *is_empty_row = true;
+        }
+    }
+    return Status::OK();
+}
+
+Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg,
+                                      std::string col_name, bool* valid) {
+    std::string err_msg;
+    if (!col_name.empty()) {
+        fmt::memory_buffer error_buf;
+        fmt::format_to(error_buf, error_msg, col_name);
+        err_msg = fmt::to_string(error_buf);
+    } else {
+        err_msg = error_msg;
+    }
+
+    RETURN_IF_ERROR(_state->append_error_msg_to_file(
+            [&]() -> std::string { return JsonReader::_print_json_value(objectValue); },
+            [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+    _counter->num_rows_filtered++;
+    if (valid != nullptr) {
+        // current row is invalid
+        *valid = false;
+    }
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h
new file mode 100644
index 0000000000..0da3b96710
--- /dev/null
+++ b/be/src/vec/exec/vjson_scanner.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BE_SRC_VJSON_SCANNER_H_
+#define BE_SRC_VJSON_SCANNER_H_
+
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/base_scanner.h"
+#include "exec/exec_node.h"
+#include "exec/json_scanner.h"
+#include "exprs/expr_context.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/row_batch.h"
+#include "runtime/tuple.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+class ExprContext;
+
+namespace vectorized {
+class VJsonReader;
+
+class VJsonScanner : public JsonScanner {
+public:
+    VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
+                 const std::vector<TBrokerRangeDesc>& ranges,
+                 const std::vector<TNetworkAddress>& broker_addresses,
+                 const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+
+    ~VJsonScanner();
+
+    Status get_next(vectorized::Block* output_block, bool* eof) override;
+
+private:
+    Status open_vjson_reader();
+    Status open_next_reader();
+
+private:
+    std::unique_ptr<VJsonReader> _cur_vjson_reader;
+};
+
+class VJsonReader : public JsonReader {
+public:
+    VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
+                bool strip_outer_array, bool num_as_string, bool fuzzy_parse, bool* scanner_eof,
+                FileReader* file_reader = nullptr, LineReader* line_reader = nullptr);
+
+    ~VJsonReader();
+
+    Status init(const std::string& jsonpath, const std::string& json_root);
+
+    Status read_json_column(std::vector<MutableColumnPtr>& columns,
+                            const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
+                            bool* eof);
+
+private:
+    Status (VJsonReader::*_vhandle_json_callback)(
+            std::vector<vectorized::MutableColumnPtr>& columns,
+            const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof);
+
+    Status _vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
+                                const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
+                                bool* eof);
+
+    Status _vhandle_flat_array_complex_json(std::vector<MutableColumnPtr>& columns,
+                                            const std::vector<SlotDescriptor*>& slot_descs,
+                                            bool* is_empty_row, bool* eof);
+
+    Status _vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
+                                        const std::vector<SlotDescriptor*>& slot_descs,
+                                        bool* is_empty_row, bool* eof);
+
+    Status _write_columns_by_jsonpath(rapidjson::Value& objectValue,
+                                      const std::vector<SlotDescriptor*>& slot_descs,
+                                      std::vector<MutableColumnPtr>& columns, bool* valid);
+
+    Status _set_column_value(rapidjson::Value& objectValue, std::vector<MutableColumnPtr>& columns,
+                             const std::vector<SlotDescriptor*>& slot_descs, bool* valid);
+
+    Status _write_data_to_column(rapidjson::Value::ConstValueIterator value,
+                                 SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr,
+                                 bool* valid);
+
+    Status _parse_json(bool* is_empty_row, bool* eof);
+
+    Status _append_error_msg(const rapidjson::Value& objectValue, std::string error_msg,
+                             std::string col_name, bool* valid);
+};
+
+} // namespace vectorized
+} // namespace doris
+#endif
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 5abf9cceb2..6b79813060 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -338,6 +338,7 @@ set(VEC_TEST_FILES
     vec/exec/vgeneric_iterators_test.cpp
     vec/exec/vbroker_scan_node_test.cpp
     vec/exec/vbroker_scanner_test.cpp
+    vec/exec/vjson_scanner_test.cpp
     vec/exec/vtablet_sink_test.cpp
     vec/exprs/vexpr_test.cpp
     vec/function/function_array_element_test.cpp
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
new file mode 100644
index 0000000000..c96772a011
--- /dev/null
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -0,0 +1,767 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vjson_scanner.h"
+
+#include <gtest/gtest.h>
+#include <time.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/broker_scan_node.h"
+#include "exec/local_file_reader.h"
+#include "exprs/cast_functions.h"
+#include "exprs/decimalv2_operators.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+namespace vectorized {
+
+class VJsonScannerTest : public testing::Test {
+public:
+    VJsonScannerTest() : _runtime_state(TQueryGlobals()) {
+        init();
+        _runtime_state._instance_mem_tracker.reset(new MemTracker());
+        _runtime_state._exec_env = ExecEnv::GetInstance();
+    }
+    void init();
+    static void SetUpTestCase() {
+        UserFunctionCache::instance()->init(
+                "./be/test/runtime/test_data/user_function_cache/normal");
+        CastFunctions::init();
+        DecimalV2Operators::init();
+    }
+
+protected:
+    virtual void SetUp() {}
+    virtual void TearDown() {}
+
+private:
+    int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+    int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+    void create_expr_info();
+    void init_desc_table();
+    RuntimeState _runtime_state;
+    ObjectPool _obj_pool;
+    std::map<std::string, SlotDescriptor*> _slots_map;
+    TBrokerScanRangeParams _params;
+    DescriptorTbl* _desc_tbl;
+    TPlanNode _tnode;
+};
+
+#define TUPLE_ID_DST 0
+#define TUPLE_ID_SRC 1
+#define COLUMN_NUMBERS 6
+#define DST_TUPLE_SLOT_ID_START 1
+#define SRC_TUPLE_SLOT_ID_START 7
+
+int VJsonScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+    const char* columnNames[] = {"category", "author", "title", "price", "largeint", "decimal"};
+    for (int i = 0; i < COLUMN_NUMBERS; i++) {
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 1;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::VARCHAR);
+            scalar_type.__set_len(65535);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = i;
+        slot_desc.byteOffset = i * 16 + 8;
+        slot_desc.nullIndicatorByte = i / 8;
+        slot_desc.nullIndicatorBit = i % 8;
+        slot_desc.colName = columnNames[i];
+        slot_desc.slotIdx = i + 1;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+
+    {
+        // TTupleDescriptor source
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = TUPLE_ID_SRC;
+        t_tuple_desc.byteSize = COLUMN_NUMBERS * 16 + 8;
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+    return next_slot_id;
+}
+
+int VJsonScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+    int32_t byteOffset = 8;
+    { //category
+        TSlotDescriptor slot_desc;
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::VARCHAR);
+            scalar_type.__set_len(65535);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 0;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 0;
+        slot_desc.colName = "category";
+        slot_desc.slotIdx = 1;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 16;
+    { // author
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::VARCHAR);
+            scalar_type.__set_len(65535);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 1;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 1;
+        slot_desc.colName = "author";
+        slot_desc.slotIdx = 2;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 16;
+    { // title
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::VARCHAR);
+            scalar_type.__set_len(65535);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 2;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 2;
+        slot_desc.colName = "title";
+        slot_desc.slotIdx = 3;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 16;
+    { // price
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::DOUBLE);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 3;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 3;
+        slot_desc.colName = "price";
+        slot_desc.slotIdx = 4;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 8;
+    { // lagreint
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::LARGEINT);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 4;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 4;
+        slot_desc.colName = "lagreint";
+        slot_desc.slotIdx = 5;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+    byteOffset += 16;
+    { // decimal
+        TSlotDescriptor slot_desc;
+
+        slot_desc.id = next_slot_id++;
+        slot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__isset.precision = true;
+            scalar_type.__isset.scale = true;
+            scalar_type.__set_precision(-1);
+            scalar_type.__set_scale(-1);
+            scalar_type.__set_type(TPrimitiveType::DECIMALV2);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        slot_desc.slotType = type;
+        slot_desc.columnPos = 5;
+        slot_desc.byteOffset = byteOffset;
+        slot_desc.nullIndicatorByte = 0;
+        slot_desc.nullIndicatorBit = 5;
+        slot_desc.colName = "decimal";
+        slot_desc.slotIdx = 6;
+        slot_desc.isMaterialized = true;
+
+        t_desc_table.slotDescriptors.push_back(slot_desc);
+    }
+
+    t_desc_table.__isset.slotDescriptors = true;
+    {
+        // TTupleDescriptor dest
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = TUPLE_ID_DST;
+        t_tuple_desc.byteSize = byteOffset + 8;
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+    return next_slot_id;
+}
+
+void VJsonScannerTest::init_desc_table() {
+    TDescriptorTable t_desc_table;
+
+    // table descriptors
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::BROKER_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    int next_slot_id = 1;
+
+    next_slot_id = create_dst_tuple(t_desc_table, next_slot_id);
+
+    next_slot_id = create_src_tuple(t_desc_table, next_slot_id);
+
+    DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+
+    _runtime_state.set_desc_tbl(_desc_tbl);
+}
+
+void VJsonScannerTest::create_expr_info() {
+    TTypeDesc varchar_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::VARCHAR);
+        scalar_type.__set_len(5000);
+        node.__set_scalar_type(scalar_type);
+        varchar_type.types.push_back(node);
+    }
+    // category VARCHAR --> VARCHAR
+    {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; // category id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START);
+    }
+    // author VARCHAR --> VARCHAR
+    {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; // author id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1);
+    }
+    // title VARCHAR --> VARCHAR
+    {
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2; // log_time id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2);
+    }
+
+    // price VARCHAR --> DOUBLE
+    {
+        TTypeDesc int_type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::BIGINT);
+            node.__set_scalar_type(scalar_type);
+            int_type.types.push_back(node);
+        }
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = int_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttodouble";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = int_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttodouble(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_double_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 3; // price id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3);
+    }
+    // largeint VARCHAR --> LargeInt
+    {
+        TTypeDesc int_type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::LARGEINT);
+            node.__set_scalar_type(scalar_type);
+            int_type.types.push_back(node);
+        }
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = int_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttolargeint";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = int_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttolargeint(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_large_int_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 4; // price id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 4, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 4);
+    }
+    // decimal VARCHAR --> Decimal
+    {
+        TTypeDesc int_type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__isset.precision = true;
+            scalar_type.__isset.scale = true;
+            scalar_type.__set_precision(-1);
+            scalar_type.__set_scale(-1);
+            scalar_type.__set_type(TPrimitiveType::DECIMALV2);
+            node.__set_scalar_type(scalar_type);
+            int_type.types.push_back(node);
+        }
+        TExprNode cast_expr;
+        cast_expr.node_type = TExprNodeType::CAST_EXPR;
+        cast_expr.type = int_type;
+        cast_expr.__set_opcode(TExprOpcode::CAST);
+        cast_expr.__set_num_children(1);
+        cast_expr.__set_output_scale(-1);
+        cast_expr.__isset.fn = true;
+        cast_expr.fn.name.function_name = "casttodecimalv2";
+        cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        cast_expr.fn.arg_types.push_back(varchar_type);
+        cast_expr.fn.ret_type = int_type;
+        cast_expr.fn.has_var_args = false;
+        cast_expr.fn.__set_signature("casttodecimalv2(VARCHAR(*))");
+        cast_expr.fn.__isset.scalar_fn = true;
+        cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";
+
+        TExprNode slot_ref;
+        slot_ref.node_type = TExprNodeType::SLOT_REF;
+        slot_ref.type = varchar_type;
+        slot_ref.num_children = 0;
+        slot_ref.__isset.slot_ref = true;
+        slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 5; // price id in src tuple
+        slot_ref.slot_ref.tuple_id = 1;
+
+        TExpr expr;
+        expr.nodes.push_back(cast_expr);
+        expr.nodes.push_back(slot_ref);
+
+        _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 5, expr);
+        _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 5);
+    }
+    // _params.__isset.expr_of_dest_slot = true;
+    _params.__set_dest_tuple_id(TUPLE_ID_DST);
+    _params.__set_src_tuple_id(TUPLE_ID_SRC);
+}
+
+void VJsonScannerTest::init() {
+    create_expr_info();
+    init_desc_table();
+
+    // Node Id
+    _tnode.node_id = 0;
+    _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
+    _tnode.num_children = 0;
+    _tnode.limit = -1;
+    _tnode.row_tuples.push_back(0);
+    _tnode.nullable_tuples.push_back(false);
+    _tnode.broker_scan_node.tuple_id = 0;
+    _tnode.__isset.broker_scan_node = true;
+}
+
+TEST_F(VJsonScannerTest, simple_array_json) {
+    VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+    scan_node.init(_tnode);
+    auto status = scan_node.prepare(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    // set scan range
+    std::vector<TScanRangeParams> scan_ranges;
+    {
+        TScanRangeParams scan_range_params;
+
+        TBrokerScanRange broker_scan_range;
+        broker_scan_range.params = _params;
+        TBrokerRangeDesc range;
+        range.start_offset = 0;
+        range.size = -1;
+        range.format_type = TFileFormatType::FORMAT_JSON;
+        range.strip_outer_array = true;
+        range.__isset.strip_outer_array = true;
+        range.splittable = true;
+        range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+        range.file_type = TFileType::FILE_LOCAL;
+        broker_scan_range.ranges.push_back(range);
+        scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+        scan_ranges.push_back(scan_range_params);
+    }
+
+    scan_node.set_scan_ranges(scan_ranges);
+    status = scan_node.open(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    bool eof = false;
+    vectorized::Block block;
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(2, block.rows());
+    EXPECT_EQ(6, block.columns());
+
+    auto columns = block.get_columns_with_type_and_name();
+    ASSERT_EQ(columns.size(), 6);
+    ASSERT_EQ(columns[0].to_string(0), "reference");
+    ASSERT_EQ(columns[0].to_string(1), "fiction");
+    ASSERT_EQ(columns[1].to_string(0), "NigelRees");
+    ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
+    ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
+    ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");
+    ASSERT_EQ(columns[3].to_string(0), "8.950000");
+    ASSERT_EQ(columns[3].to_string(1), "12.990000");
+    ASSERT_EQ(columns[4].to_string(0), "1234");
+    ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424.000000");
+    ASSERT_EQ(columns[5].to_string(0), "1234.123400");
+    ASSERT_EQ(columns[5].to_string(1), "10000000000000.001953");
+
+    block.clear();
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    ASSERT_EQ(0, block.rows());
+    ASSERT_TRUE(eof);
+    scan_node.close(&_runtime_state);
+}
+
+TEST_F(VJsonScannerTest, use_jsonpaths_with_file_reader) {
+    VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+    scan_node.init(_tnode);
+    auto status = scan_node.prepare(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    // set scan range
+    std::vector<TScanRangeParams> scan_ranges;
+    {
+        TScanRangeParams scan_range_params;
+
+        TBrokerScanRange broker_scan_range;
+        broker_scan_range.params = _params;
+        TBrokerRangeDesc range;
+        range.start_offset = 0;
+        range.size = -1;
+        range.format_type = TFileFormatType::FORMAT_JSON;
+        range.strip_outer_array = true;
+        range.__isset.strip_outer_array = true;
+        range.splittable = true;
+        range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+        range.file_type = TFileType::FILE_LOCAL;
+        range.jsonpaths =
+                "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", "
+                "\"$.decimal\"]";
+        range.__isset.jsonpaths = true;
+        broker_scan_range.ranges.push_back(range);
+        scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+        scan_ranges.push_back(scan_range_params);
+    }
+
+    scan_node.set_scan_ranges(scan_ranges);
+    status = scan_node.open(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    bool eof = false;
+    vectorized::Block block;
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(2, block.rows());
+    EXPECT_EQ(6, block.columns());
+
+    auto columns = block.get_columns_with_type_and_name();
+    ASSERT_EQ(columns.size(), 6);
+    ASSERT_EQ(columns[0].to_string(0), "reference");
+    ASSERT_EQ(columns[0].to_string(1), "fiction");
+    ASSERT_EQ(columns[1].to_string(0), "NigelRees");
+    ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
+    ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
+    ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");
+
+    block.clear();
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    ASSERT_EQ(0, block.rows());
+    ASSERT_TRUE(eof);
+    scan_node.close(&_runtime_state);
+}
+
+TEST_F(VJsonScannerTest, use_jsonpaths_with_line_reader) {
+    VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+    scan_node.init(_tnode);
+    auto status = scan_node.prepare(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    std::vector<TScanRangeParams> scan_ranges;
+    {
+        TScanRangeParams scan_range_params;
+
+        TBrokerScanRange broker_scan_range;
+        broker_scan_range.params = _params;
+        TBrokerRangeDesc range;
+        range.start_offset = 0;
+        range.size = -1;
+        range.format_type = TFileFormatType::FORMAT_JSON;
+        range.splittable = true;
+        range.strip_outer_array = true;
+        range.__isset.strip_outer_array = true;
+        range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+        range.file_type = TFileType::FILE_LOCAL;
+        range.jsonpaths =
+                "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", "
+                "\"$.decimal\"]";
+        range.__isset.jsonpaths = true;
+        range.read_json_by_line = true;
+        range.__isset.read_json_by_line = true;
+        broker_scan_range.ranges.push_back(range);
+        scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+        scan_ranges.push_back(scan_range_params);
+    }
+
+    scan_node.set_scan_ranges(scan_ranges);
+    status = scan_node.open(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    bool eof = false;
+    vectorized::Block block;
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(2, block.rows());
+    EXPECT_EQ(6, block.columns());
+
+    auto columns = block.get_columns_with_type_and_name();
+    ASSERT_EQ(columns.size(), 6);
+    ASSERT_EQ(columns[0].to_string(0), "reference");
+    ASSERT_EQ(columns[0].to_string(1), "fiction");
+    ASSERT_EQ(columns[1].to_string(0), "NigelRees");
+    ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
+    ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
+    ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");
+
+    block.clear();
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    ASSERT_EQ(0, block.rows());
+    ASSERT_TRUE(eof);
+    scan_node.close(&_runtime_state);
+}
+
+TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) {
+    VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+    scan_node.init(_tnode);
+    auto status = scan_node.prepare(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    // set scan range
+    std::vector<TScanRangeParams> scan_ranges;
+    {
+        TScanRangeParams scan_range_params;
+
+        TBrokerScanRange broker_scan_range;
+        broker_scan_range.params = _params;
+        TBrokerRangeDesc range;
+        range.start_offset = 0;
+        range.size = -1;
+        range.format_type = TFileFormatType::FORMAT_JSON;
+        range.strip_outer_array = true;
+        range.__isset.strip_outer_array = true;
+        range.splittable = true;
+        range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+        range.file_type = TFileType::FILE_LOCAL;
+        range.jsonpaths = "[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\", \"$.k5\", \"$.k6\"]";
+        range.__isset.jsonpaths = true;
+        broker_scan_range.ranges.push_back(range);
+        scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+        scan_ranges.push_back(scan_range_params);
+    }
+
+    scan_node.set_scan_ranges(scan_ranges);
+    status = scan_node.open(&_runtime_state);
+    EXPECT_TRUE(status.ok());
+
+    bool eof = false;
+    vectorized::Block block;
+    status = scan_node.get_next(&_runtime_state, &block, &eof);
+    EXPECT_TRUE(status.ok());
+    EXPECT_EQ(2, block.rows());
+    EXPECT_EQ(6, block.columns());
+
+    auto columns = block.get_columns_with_type_and_name();
+    ASSERT_EQ(columns.size(), 6);
+    ASSERT_EQ(columns[0].to_string(0), "\\N");
+    ASSERT_EQ(columns[0].to_string(1), "\\N");
+    ASSERT_EQ(columns[1].to_string(0), "\\N");
+    ASSERT_EQ(columns[1].to_string(1), "\\N");
+    ASSERT_EQ(columns[2].to_string(0), "\\N");
+    ASSERT_EQ(columns[2].to_string(1), "\\N");
+    block.clear();
+    scan_node.close(&_runtime_state);
+}
+
+} // namespace vectorized
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org