You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/14 01:50:13 UTC
[incubator-doris] branch master updated: [feature] add vectorized vjson_scanner (#9311)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b817efd652 [feature] add vectorized vjson_scanner (#9311)
b817efd652 is described below
commit b817efd6527163bc637d3ae1ced82d9015b69f91
Author: carlvinhust2012 <hu...@126.com>
AuthorDate: Sat May 14 09:50:05 2022 +0800
[feature] add vectorized vjson_scanner (#9311)
This pr is used to add the vectorized vjson_scanner, which can support vectorized json import in stream load flow.
---
be/src/exec/base_scanner.cpp | 53 ++-
be/src/exec/base_scanner.h | 8 +
be/src/exec/broker_scan_node.cpp | 13 +-
be/src/exec/json_scanner.cpp | 61 ++-
be/src/exec/json_scanner.h | 13 +-
be/src/vec/CMakeLists.txt | 1 +
be/src/vec/exec/vjson_scanner.cpp | 522 ++++++++++++++++++++++
be/src/vec/exec/vjson_scanner.h | 120 +++++
be/test/CMakeLists.txt | 1 +
be/test/vec/exec/vjson_scanner_test.cpp | 767 ++++++++++++++++++++++++++++++++
10 files changed, 1529 insertions(+), 30 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 8621cf75f8..c4e1b5c056 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -156,7 +156,6 @@ Status BaseScanner::init_expr_ctxes() {
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
}
-
if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
@@ -279,6 +278,58 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
return Status::OK();
}
+Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
+ // filter block
+ if (!_vpre_filter_ctxs.empty()) {
+ for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
+ auto old_rows = temp_block->rows();
+ RETURN_IF_ERROR(
+ vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
+ _counter->num_rows_unselected += old_rows - temp_block->rows();
+ }
+ }
+ return Status::OK();
+}
+
+Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
+ // Do vectorized expr here
+ Status status;
+ if (!_dest_vexpr_ctx.empty()) {
+ *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
+ _dest_vexpr_ctx, *temp_block, status);
+ if (UNLIKELY(output_block->rows() == 0)) {
+ return status;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
+ std::vector<vectorized::MutableColumnPtr>& columns) {
+ if (columns.empty() || columns[0]->size() == 0) {
+ return Status::OK();
+ }
+
+ std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
+ auto n_columns = 0;
+ for (const auto slot_desc : _src_slot_descs) {
+ temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+
+ RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size()));
+
+ if (_dest_vexpr_ctx.empty()) {
+ *dest_block = *temp_block;
+ } else {
+ RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
+ }
+
+ return Status::OK();
+}
+
void BaseScanner::fill_slots_of_columns_from_path(
int start, const std::vector<std::string>& columns_from_path) {
// values of columns from path can not be null
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 9c4179874e..02c2f56880 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -22,6 +22,7 @@
#include "runtime/tuple.h"
#include "util/runtime_profile.h"
#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
namespace doris {
@@ -75,11 +76,18 @@ public:
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
+ Status fill_dest_block(vectorized::Block* dest_block,
+ std::vector<vectorized::MutableColumnPtr>& columns);
+
void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);
void free_expr_local_allocations();
+ Status filter_block(vectorized::Block* temp_block, size_t slot_num);
+
+ Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block);
+
protected:
RuntimeState* _state;
const TBrokerScanRangeParams& _params;
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 513e653fb7..c1144495c2 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -32,6 +32,7 @@
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "vec/exec/vbroker_scanner.h"
+#include "vec/exec/vjson_scanner.h"
namespace doris {
@@ -234,9 +235,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
counter);
break;
case TFileFormatType::FORMAT_JSON:
- scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params,
- scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs,
- counter);
+ if (_vectorized) {
+ scan = new vectorized::VJsonScanner(
+ _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges,
+ scan_range.broker_addresses, _pre_filter_texprs, counter);
+ } else {
+ scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params,
+ scan_range.ranges, scan_range.broker_addresses,
+ _pre_filter_texprs, counter);
+ }
break;
default:
if (_vectorized) {
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 5e002ac44e..a23ce44b03 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -112,14 +112,17 @@ Status JsonScanner::open_next_reader() {
_scanner_eof = true;
return Status::OK();
}
+ RETURN_IF_ERROR(open_based_reader());
+ RETURN_IF_ERROR(open_json_reader());
+ _next_range++;
+ return Status::OK();
+}
+Status JsonScanner::open_based_reader() {
RETURN_IF_ERROR(open_file_reader());
if (_read_json_by_line) {
RETURN_IF_ERROR(open_line_reader());
}
- RETURN_IF_ERROR(open_json_reader());
- _next_range++;
-
return Status::OK();
}
@@ -215,6 +218,25 @@ Status JsonScanner::open_json_reader() {
bool num_as_string = false;
bool fuzzy_parse = false;
+ RETURN_IF_ERROR(
+ get_range_params(jsonpath, json_root, strip_outer_array, num_as_string, fuzzy_parse));
+ if (_read_json_by_line) {
+ _cur_json_reader =
+ new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
+ fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader);
+ } else {
+ _cur_json_reader =
+ new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
+ fuzzy_parse, &_scanner_eof, _cur_file_reader);
+ }
+
+ RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
+ return Status::OK();
+}
+
+Status JsonScanner::get_range_params(std::string& jsonpath, std::string& json_root,
+ bool& strip_outer_array, bool& num_as_string,
+ bool& fuzzy_parse) {
const TBrokerRangeDesc& range = _ranges[_next_range];
if (range.__isset.jsonpaths) {
@@ -232,17 +254,6 @@ Status JsonScanner::open_json_reader() {
if (range.__isset.fuzzy_parse) {
fuzzy_parse = range.fuzzy_parse;
}
- if (_read_json_by_line) {
- _cur_json_reader =
- new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
- fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader);
- } else {
- _cur_json_reader =
- new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
- fuzzy_parse, &_scanner_eof, _cur_file_reader);
- }
-
- RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
return Status::OK();
}
@@ -308,14 +319,8 @@ JsonReader::~JsonReader() {
}
Status JsonReader::init(const std::string& jsonpath, const std::string& json_root) {
- // parse jsonpath
- if (!jsonpath.empty()) {
- Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths);
- RETURN_IF_ERROR(st);
- }
- if (!json_root.empty()) {
- JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
- }
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root(jsonpath, json_root));
//improve performance
if (_parsed_jsonpaths.empty()) { // input is a simple json-string
@@ -330,6 +335,18 @@ Status JsonReader::init(const std::string& jsonpath, const std::string& json_roo
return Status::OK();
}
+Status JsonReader::_parse_jsonpath_and_json_root(const std::string& jsonpath,
+ const std::string& json_root) {
+ // parse jsonpath
+ if (!jsonpath.empty()) {
+ RETURN_IF_ERROR(_generate_json_paths(jsonpath, &_parsed_jsonpaths));
+ }
+ if (!json_root.empty()) {
+ JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
+ }
+ return Status::OK();
+}
+
Status JsonReader::_generate_json_paths(const std::string& jsonpath,
std::vector<std::vector<JsonPath>>* vect) {
rapidjson::Document jsonpaths_doc;
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index b12c96f396..276b2dd077 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -67,13 +67,17 @@ public:
// Close this scanner
void close() override;
-private:
+protected:
Status open_file_reader();
Status open_line_reader();
Status open_json_reader();
Status open_next_reader();
-private:
+ Status open_based_reader();
+ Status get_range_params(std::string& jsonpath, std::string& json_root, bool& strip_outer_array,
+ bool& num_as_string, bool& fuzzy_parse);
+
+protected:
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;
@@ -129,7 +133,7 @@ public:
Status read_json_row(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs,
MemPool* tuple_pool, bool* is_empty_row, bool* eof);
-private:
+protected:
Status (JsonReader::*_handle_json_callback)(Tuple* tuple,
const std::vector<SlotDescriptor*>& slot_descs,
MemPool* tuple_pool, bool* is_empty_row, bool* eof);
@@ -158,8 +162,9 @@ private:
void _close();
Status _generate_json_paths(const std::string& jsonpath,
std::vector<std::vector<JsonPath>>* vect);
+ Status _parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root);
-private:
+protected:
int _next_line;
int _total_lines;
RuntimeState* _state;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 7555e9d0ca..22fa489f46 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -101,6 +101,7 @@ set(VEC_FILES
exec/vtable_function_node.cpp
exec/vbroker_scan_node.cpp
exec/vbroker_scanner.cpp
+ exec/vjson_scanner.cpp
exec/join/vhash_join_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp
new file mode 100644
index 0000000000..b46d16e80e
--- /dev/null
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vjson_scanner.h"
+
+#include <fmt/format.h>
+
+#include <algorithm>
+
+#include "env/env.h"
+#include "exec/broker_reader.h"
+#include "exec/buffered_reader.h"
+#include "exec/local_file_reader.h"
+#include "exec/plain_text_line_reader.h"
+#include "exec/s3_reader.h"
+#include "exprs/expr.h"
+#include "exprs/json_functions.h"
+#include "gutil/strings/split.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/time.h"
+
+namespace doris::vectorized {
+
+VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile,
+ const TBrokerScanRangeParams& params,
+ const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+ : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
+ _cur_vjson_reader(nullptr) {}
+
+VJsonScanner::~VJsonScanner() {}
+
+Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) {
+ SCOPED_TIMER(_read_timer);
+ const int batch_size = _state->batch_size();
+ size_t slot_num = _src_slot_descs.size();
+ std::vector<vectorized::MutableColumnPtr> columns(slot_num);
+ auto string_type = make_nullable(std::make_shared<DataTypeString>());
+ for (int i = 0; i < slot_num; i++) {
+ columns[i] = string_type->create_column();
+ }
+
+ // Get one line
+ while (columns[0]->size() < batch_size && !_scanner_eof) {
+ if (_cur_file_reader == nullptr || _cur_reader_eof) {
+ RETURN_IF_ERROR(open_next_reader());
+ // If there isn't any more reader, break this
+ if (_scanner_eof) {
+ break;
+ }
+ }
+
+ if (_read_json_by_line && _skip_next_line) {
+ size_t size = 0;
+ const uint8_t* line_ptr = nullptr;
+ RETURN_IF_ERROR(_cur_line_reader->read_line(&line_ptr, &size, &_cur_reader_eof));
+ _skip_next_line = false;
+ continue;
+ }
+
+ bool is_empty_row = false;
+ RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(columns, _src_slot_descs, &is_empty_row,
+ &_cur_reader_eof));
+ if (is_empty_row) {
+ // Read empty row, just continue
+ continue;
+ }
+ }
+
+ COUNTER_UPDATE(_rows_read_counter, columns[0]->size());
+ SCOPED_TIMER(_materialize_timer);
+ RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns));
+
+ *eof = _scanner_eof;
+ return Status::OK();
+}
+
+Status VJsonScanner::open_next_reader() {
+ if (_next_range >= _ranges.size()) {
+ _scanner_eof = true;
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(JsonScanner::open_based_reader());
+ RETURN_IF_ERROR(open_vjson_reader());
+ _next_range++;
+ return Status::OK();
+}
+
+Status VJsonScanner::open_vjson_reader() {
+ if (_cur_vjson_reader != nullptr) {
+ _cur_vjson_reader.reset();
+ }
+ std::string json_root = "";
+ std::string jsonpath = "";
+ bool strip_outer_array = false;
+ bool num_as_string = false;
+ bool fuzzy_parse = false;
+
+ RETURN_IF_ERROR(JsonScanner::get_range_params(jsonpath, json_root, strip_outer_array,
+ num_as_string, fuzzy_parse));
+ if (_read_json_by_line) {
+ _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array,
+ num_as_string, fuzzy_parse, &_scanner_eof, nullptr,
+ _cur_line_reader));
+ } else {
+ _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array,
+ num_as_string, fuzzy_parse, &_scanner_eof,
+ _cur_file_reader));
+ }
+
+ RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root));
+ return Status::OK();
+}
+
+VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
+ bool strip_outer_array, bool num_as_string, bool fuzzy_parse,
+ bool* scanner_eof, FileReader* file_reader, LineReader* line_reader)
+ : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse,
+ scanner_eof, file_reader, line_reader),
+ _vhandle_json_callback(nullptr) {}
+
+VJsonReader::~VJsonReader() {}
+
+Status VJsonReader::init(const std::string& jsonpath, const std::string& json_root) {
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(JsonReader::_parse_jsonpath_and_json_root(jsonpath, json_root));
+
+ //improve performance
+ if (_parsed_jsonpaths.empty()) { // input is a simple json-string
+ _vhandle_json_callback = &VJsonReader::_vhandle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback = &VJsonReader::_vhandle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback = &VJsonReader::_vhandle_nested_complex_json;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status VJsonReader::read_json_column(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof);
+}
+
+Status VJsonReader::_vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ do {
+ bool valid = false;
+ if (_next_line >= _total_lines) { // parse json and generic document
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ _name_map.clear();
+ rapidjson::Value* objectValue = nullptr;
+ if (_json_doc->IsArray()) {
+ _total_lines = _json_doc->Size();
+ if (_total_lines == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ objectValue = &(*_json_doc)[0];
+ } else {
+ _total_lines = 1; // only one row
+ objectValue = _json_doc;
+ }
+ _next_line = 0;
+ if (_fuzzy_parse) {
+ for (auto v : slot_descs) {
+ for (int i = 0; i < objectValue->MemberCount(); ++i) {
+ auto it = objectValue->MemberBegin() + i;
+ if (v->col_name() == it->name.GetString()) {
+ _name_map[v->col_name()] = i;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (_json_doc->IsArray()) { // handle case 1
+ rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object
+ RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid));
+ } else { // handle case 2
+ RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, &valid));
+ }
+ _next_line++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } while (_next_line <= _total_lines);
+ return Status::OK();
+}
+
+// for simple format json
+// set valid to true and return OK if succeed.
+// set valid to false and return OK if we met an invalid row.
+// return other status if encounter other problmes.
+Status VJsonReader::_set_column_value(rapidjson::Value& objectValue,
+ std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs, bool* valid) {
+ if (!objectValue.IsObject()) {
+ // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"},
+ // not other type of Json format.
+ RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object value", "", valid));
+ return Status::OK();
+ }
+
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd();
+
+ if (_fuzzy_parse) {
+ auto idx_it = _name_map.find(slot_desc->col_name());
+ if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) {
+ it = objectValue.MemberBegin() + idx_it->second;
+ }
+ } else {
+ it = objectValue.FindMember(
+ rapidjson::Value(slot_desc->col_name().c_str(), slot_desc->col_name().size()));
+ }
+
+ if (it != objectValue.MemberEnd()) {
+ const rapidjson::Value& value = it->value;
+ RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, column_ptr, valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ } else { // not found
+ if (slot_desc->is_nullable()) {
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+ nullable_column->insert_default();
+ nullcount++;
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ objectValue,
+ "The column `{}` is not nullable, but it's not found in jsondata.",
+ slot_desc->col_name(), valid));
+ break;
+ }
+ }
+ }
+
+ if (nullcount == slot_descs.size()) {
+ RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, this is a invalid row.",
+ "", valid));
+ return Status::OK();
+ }
+ *valid = true;
+ return Status::OK();
+}
+
+Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn* column_ptr, bool* valid) {
+ const char* str_value = nullptr;
+ char tmp_buf[128] = {0};
+ int32_t wbytes = 0;
+
+ if (slot_desc->is_nullable()) {
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+ nullable_column->get_null_map_data().push_back(0);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ switch (value->GetType()) {
+ case rapidjson::Type::kStringType:
+ str_value = value->GetString();
+ wbytes = strlen(str_value);
+ break;
+ case rapidjson::Type::kNumberType:
+ if (value->IsUint()) {
+ wbytes = sprintf(tmp_buf, "%u", value->GetUint());
+ } else if (value->IsInt()) {
+ wbytes = sprintf(tmp_buf, "%d", value->GetInt());
+ } else if (value->IsUint64()) {
+ wbytes = sprintf(tmp_buf, "%lu", value->GetUint64());
+ } else if (value->IsInt64()) {
+ wbytes = sprintf(tmp_buf, "%ld", value->GetInt64());
+ } else {
+ wbytes = sprintf(tmp_buf, "%f", value->GetDouble());
+ }
+ str_value = tmp_buf;
+ break;
+ case rapidjson::Type::kFalseType:
+ wbytes = 1;
+ str_value = (char*)"0";
+ break;
+ case rapidjson::Type::kTrueType:
+ wbytes = 1;
+ str_value = (char*)"1";
+ break;
+ case rapidjson::Type::kNullType:
+ if (slot_desc->is_nullable()) {
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ *value, "Json value is null, but the column `{}` is not nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ break;
+ default:
+ // for other type like array or object. we convert it to string to save
+ std::string json_str = JsonReader::_print_json_value(*value);
+ wbytes = json_str.size();
+ str_value = json_str.c_str();
+ break;
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ DCHECK(slot_desc->type().type == TYPE_VARCHAR);
+ assert_cast<ColumnString*>(column_ptr)->insert_data(str_value, wbytes);
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status VJsonReader::_vhandle_flat_array_complex_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ do {
+ if (_next_line >= _total_lines) {
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_lines == 0) {
+ continue;
+ }
+ }
+ }
+ rapidjson::Value& objectValue = (*_json_doc)[_next_line++];
+ bool valid = true;
+ RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, columns, &valid));
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } while (_next_line <= _total_lines);
+ return Status::OK();
+}
+
+Status VJsonReader::_vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ while (true) {
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ break; // read a valid row
+ }
+ bool valid = true;
+ RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false, just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ return Status::OK();
+}
+
+Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ std::vector<MutableColumnPtr>& columns,
+ bool* valid) {
+ int nullcount = 0;
+ int ctx_idx = 0;
+ size_t column_num = slot_descs.size();
+ for (size_t i = 0; i < column_num; i++) {
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ rapidjson::Value* json_values = nullptr;
+ bool wrap_explicitly = false;
+ if (LIKELY(i < _parsed_jsonpaths.size())) {
+ json_values = JsonFunctions::get_json_array_from_parsed_json(
+ _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(),
+ &wrap_explicitly);
+ }
+
+ if (json_values == nullptr) {
+ // not match in jsondata.
+ if (slot_descs[i]->is_nullable()) {
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+ nullable_column->insert_default();
+ nullcount++;
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ objectValue,
+ "The column `{}` is not nullable, but it's not found in jsondata.",
+ slot_descs[i]->col_name(), valid));
+ break;
+ }
+ } else {
+ CHECK(json_values->IsArray());
+ if (json_values->Size() == 1 && wrap_explicitly) {
+ // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array.
+ // so here we unwrap the array to get the real element.
+ // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap.
+ json_values = &((*json_values)[0]);
+ }
+ RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid));
+ if (!(*valid)) {
+ break;
+ }
+ }
+ }
+
+ if (nullcount == column_num) {
+ RETURN_IF_ERROR(_append_error_msg(
+ objectValue, "All fields is null or not matched, this is a invalid row.", "",
+ valid));
+ }
+ return Status::OK();
+}
+
+Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ Status st = JsonReader::_parse_json_doc(&size, eof);
+ // terminate if encounter other errors
+ RETURN_IF_ERROR(st);
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_lines = _json_doc->Size();
+ _next_line = 0;
+
+ if (_total_lines == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+
+Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return JsonReader::_print_json_value(objectValue); },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h
new file mode 100644
index 0000000000..0da3b96710
--- /dev/null
+++ b/be/src/vec/exec/vjson_scanner.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BE_SRC_VJSON_SCANNER_H_
+#define BE_SRC_VJSON_SCANNER_H_
+
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/base_scanner.h"
+#include "exec/exec_node.h"
+#include "exec/json_scanner.h"
+#include "exprs/expr_context.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/row_batch.h"
+#include "runtime/tuple.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+class ExprContext;
+
+namespace vectorized {
+class VJsonReader;
+
+class VJsonScanner : public JsonScanner {
+public:
+ VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
+ const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+
+ ~VJsonScanner();
+
+ Status get_next(vectorized::Block* output_block, bool* eof) override;
+
+private:
+ Status open_vjson_reader();
+ Status open_next_reader();
+
+private:
+ std::unique_ptr<VJsonReader> _cur_vjson_reader;
+};
+
+class VJsonReader : public JsonReader {
+public:
+ VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
+ bool strip_outer_array, bool num_as_string, bool fuzzy_parse, bool* scanner_eof,
+ FileReader* file_reader = nullptr, LineReader* line_reader = nullptr);
+
+ ~VJsonReader();
+
+ Status init(const std::string& jsonpath, const std::string& json_root);
+
+ Status read_json_column(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
+ bool* eof);
+
+private:
+ Status (VJsonReader::*_vhandle_json_callback)(
+ std::vector<vectorized::MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof);
+
+ Status _vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
+ bool* eof);
+
+ Status _vhandle_flat_array_complex_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof);
+
+ Status _vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof);
+
+ Status _write_columns_by_jsonpath(rapidjson::Value& objectValue,
+ const std::vector<SlotDescriptor*>& slot_descs,
+ std::vector<MutableColumnPtr>& columns, bool* valid);
+
+ Status _set_column_value(rapidjson::Value& objectValue, std::vector<MutableColumnPtr>& columns,
+ const std::vector<SlotDescriptor*>& slot_descs, bool* valid);
+
+ Status _write_data_to_column(rapidjson::Value::ConstValueIterator value,
+ SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr,
+ bool* valid);
+
+ Status _parse_json(bool* is_empty_row, bool* eof);
+
+ Status _append_error_msg(const rapidjson::Value& objectValue, std::string error_msg,
+ std::string col_name, bool* valid);
+};
+
+} // namespace vectorized
+} // namespace doris
+#endif
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 5abf9cceb2..6b79813060 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -338,6 +338,7 @@ set(VEC_TEST_FILES
vec/exec/vgeneric_iterators_test.cpp
vec/exec/vbroker_scan_node_test.cpp
vec/exec/vbroker_scanner_test.cpp
+ vec/exec/vjson_scanner_test.cpp
vec/exec/vtablet_sink_test.cpp
vec/exprs/vexpr_test.cpp
vec/function/function_array_element_test.cpp
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
new file mode 100644
index 0000000000..c96772a011
--- /dev/null
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -0,0 +1,767 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/vjson_scanner.h"
+
+#include <gtest/gtest.h>
+#include <time.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/broker_scan_node.h"
+#include "exec/local_file_reader.h"
+#include "exprs/cast_functions.h"
+#include "exprs/decimalv2_operators.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris {
+namespace vectorized {
+
+class VJsonScannerTest : public testing::Test {
+public:
+ VJsonScannerTest() : _runtime_state(TQueryGlobals()) {
+ init();
+ _runtime_state._instance_mem_tracker.reset(new MemTracker());
+ _runtime_state._exec_env = ExecEnv::GetInstance();
+ }
+ void init();
+ static void SetUpTestCase() {
+ UserFunctionCache::instance()->init(
+ "./be/test/runtime/test_data/user_function_cache/normal");
+ CastFunctions::init();
+ DecimalV2Operators::init();
+ }
+
+protected:
+ virtual void SetUp() {}
+ virtual void TearDown() {}
+
+private:
+ int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+ int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
+ void create_expr_info();
+ void init_desc_table();
+ RuntimeState _runtime_state;
+ ObjectPool _obj_pool;
+ std::map<std::string, SlotDescriptor*> _slots_map;
+ TBrokerScanRangeParams _params;
+ DescriptorTbl* _desc_tbl;
+ TPlanNode _tnode;
+};
+
+#define TUPLE_ID_DST 0
+#define TUPLE_ID_SRC 1
+#define COLUMN_NUMBERS 6
+#define DST_TUPLE_SLOT_ID_START 1
+#define SRC_TUPLE_SLOT_ID_START 7
+
+int VJsonScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+ const char* columnNames[] = {"category", "author", "title", "price", "largeint", "decimal"};
+ for (int i = 0; i < COLUMN_NUMBERS; i++) {
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 1;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = i;
+ slot_desc.byteOffset = i * 16 + 8;
+ slot_desc.nullIndicatorByte = i / 8;
+ slot_desc.nullIndicatorBit = i % 8;
+ slot_desc.colName = columnNames[i];
+ slot_desc.slotIdx = i + 1;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+
+ {
+ // TTupleDescriptor source
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = TUPLE_ID_SRC;
+ t_tuple_desc.byteSize = COLUMN_NUMBERS * 16 + 8;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+ return next_slot_id;
+}
+
+int VJsonScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
+ int32_t byteOffset = 8;
+ { //category
+ TSlotDescriptor slot_desc;
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 0;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 0;
+ slot_desc.colName = "category";
+ slot_desc.slotIdx = 1;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 16;
+ { // author
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 1;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 1;
+ slot_desc.colName = "author";
+ slot_desc.slotIdx = 2;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 16;
+ { // title
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(65535);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 2;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 2;
+ slot_desc.colName = "title";
+ slot_desc.slotIdx = 3;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 16;
+ { // price
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::DOUBLE);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 3;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 3;
+ slot_desc.colName = "price";
+ slot_desc.slotIdx = 4;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 8;
+ { // lagreint
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::LARGEINT);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 4;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 4;
+ slot_desc.colName = "lagreint";
+ slot_desc.slotIdx = 5;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+ byteOffset += 16;
+ { // decimal
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__isset.precision = true;
+ scalar_type.__isset.scale = true;
+ scalar_type.__set_precision(-1);
+ scalar_type.__set_scale(-1);
+ scalar_type.__set_type(TPrimitiveType::DECIMALV2);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 5;
+ slot_desc.byteOffset = byteOffset;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = 5;
+ slot_desc.colName = "decimal";
+ slot_desc.slotIdx = 6;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
+ }
+
+ t_desc_table.__isset.slotDescriptors = true;
+ {
+ // TTupleDescriptor dest
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = TUPLE_ID_DST;
+ t_tuple_desc.byteSize = byteOffset + 8;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+ return next_slot_id;
+}
+
+void VJsonScannerTest::init_desc_table() {
+ TDescriptorTable t_desc_table;
+
+ // table descriptors
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::BROKER_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ int next_slot_id = 1;
+
+ next_slot_id = create_dst_tuple(t_desc_table, next_slot_id);
+
+ next_slot_id = create_src_tuple(t_desc_table, next_slot_id);
+
+ DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+
+ _runtime_state.set_desc_tbl(_desc_tbl);
+}
+
+void VJsonScannerTest::create_expr_info() {
+ TTypeDesc varchar_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(5000);
+ node.__set_scalar_type(scalar_type);
+ varchar_type.types.push_back(node);
+ }
+ // category VARCHAR --> VARCHAR
+ {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; // category id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START);
+ }
+ // author VARCHAR --> VARCHAR
+ {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; // author id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1);
+ }
+ // title VARCHAR --> VARCHAR
+ {
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2; // log_time id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2);
+ }
+
+ // price VARCHAR --> DOUBLE
+ {
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BIGINT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = int_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttodouble";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = int_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttodouble(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_double_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 3; // price id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3);
+ }
+ // largeint VARCHAR --> LargeInt
+ {
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::LARGEINT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = int_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttolargeint";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = int_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttolargeint(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_large_int_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 4; // price id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 4, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 4);
+ }
+ // decimal VARCHAR --> Decimal
+ {
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__isset.precision = true;
+ scalar_type.__isset.scale = true;
+ scalar_type.__set_precision(-1);
+ scalar_type.__set_scale(-1);
+ scalar_type.__set_type(TPrimitiveType::DECIMALV2);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+ TExprNode cast_expr;
+ cast_expr.node_type = TExprNodeType::CAST_EXPR;
+ cast_expr.type = int_type;
+ cast_expr.__set_opcode(TExprOpcode::CAST);
+ cast_expr.__set_num_children(1);
+ cast_expr.__set_output_scale(-1);
+ cast_expr.__isset.fn = true;
+ cast_expr.fn.name.function_name = "casttodecimalv2";
+ cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ cast_expr.fn.arg_types.push_back(varchar_type);
+ cast_expr.fn.ret_type = int_type;
+ cast_expr.fn.has_var_args = false;
+ cast_expr.fn.__set_signature("casttodecimalv2(VARCHAR(*))");
+ cast_expr.fn.__isset.scalar_fn = true;
+ cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";
+
+ TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = varchar_type;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 5; // price id in src tuple
+ slot_ref.slot_ref.tuple_id = 1;
+
+ TExpr expr;
+ expr.nodes.push_back(cast_expr);
+ expr.nodes.push_back(slot_ref);
+
+ _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 5, expr);
+ _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 5);
+ }
+ // _params.__isset.expr_of_dest_slot = true;
+ _params.__set_dest_tuple_id(TUPLE_ID_DST);
+ _params.__set_src_tuple_id(TUPLE_ID_SRC);
+}
+
+void VJsonScannerTest::init() {
+ create_expr_info();
+ init_desc_table();
+
+ // Node Id
+ _tnode.node_id = 0;
+ _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
+ _tnode.num_children = 0;
+ _tnode.limit = -1;
+ _tnode.row_tuples.push_back(0);
+ _tnode.nullable_tuples.push_back(false);
+ _tnode.broker_scan_node.tuple_id = 0;
+ _tnode.__isset.broker_scan_node = true;
+}
+
+TEST_F(VJsonScannerTest, simple_array_json) {
+ VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+ scan_node.init(_tnode);
+ auto status = scan_node.prepare(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ // set scan range
+ std::vector<TScanRangeParams> scan_ranges;
+ {
+ TScanRangeParams scan_range_params;
+
+ TBrokerScanRange broker_scan_range;
+ broker_scan_range.params = _params;
+ TBrokerRangeDesc range;
+ range.start_offset = 0;
+ range.size = -1;
+ range.format_type = TFileFormatType::FORMAT_JSON;
+ range.strip_outer_array = true;
+ range.__isset.strip_outer_array = true;
+ range.splittable = true;
+ range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+ range.file_type = TFileType::FILE_LOCAL;
+ broker_scan_range.ranges.push_back(range);
+ scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+ scan_ranges.push_back(scan_range_params);
+ }
+
+ scan_node.set_scan_ranges(scan_ranges);
+ status = scan_node.open(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ bool eof = false;
+ vectorized::Block block;
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(2, block.rows());
+ EXPECT_EQ(6, block.columns());
+
+ auto columns = block.get_columns_with_type_and_name();
+ ASSERT_EQ(columns.size(), 6);
+ ASSERT_EQ(columns[0].to_string(0), "reference");
+ ASSERT_EQ(columns[0].to_string(1), "fiction");
+ ASSERT_EQ(columns[1].to_string(0), "NigelRees");
+ ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
+ ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
+ ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");
+ ASSERT_EQ(columns[3].to_string(0), "8.950000");
+ ASSERT_EQ(columns[3].to_string(1), "12.990000");
+ ASSERT_EQ(columns[4].to_string(0), "1234");
+ ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424.000000");
+ ASSERT_EQ(columns[5].to_string(0), "1234.123400");
+ ASSERT_EQ(columns[5].to_string(1), "10000000000000.001953");
+
+ block.clear();
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ ASSERT_EQ(0, block.rows());
+ ASSERT_TRUE(eof);
+ scan_node.close(&_runtime_state);
+}
+
+TEST_F(VJsonScannerTest, use_jsonpaths_with_file_reader) {
+ VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+ scan_node.init(_tnode);
+ auto status = scan_node.prepare(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ // set scan range
+ std::vector<TScanRangeParams> scan_ranges;
+ {
+ TScanRangeParams scan_range_params;
+
+ TBrokerScanRange broker_scan_range;
+ broker_scan_range.params = _params;
+ TBrokerRangeDesc range;
+ range.start_offset = 0;
+ range.size = -1;
+ range.format_type = TFileFormatType::FORMAT_JSON;
+ range.strip_outer_array = true;
+ range.__isset.strip_outer_array = true;
+ range.splittable = true;
+ range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+ range.file_type = TFileType::FILE_LOCAL;
+ range.jsonpaths =
+ "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", "
+ "\"$.decimal\"]";
+ range.__isset.jsonpaths = true;
+ broker_scan_range.ranges.push_back(range);
+ scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+ scan_ranges.push_back(scan_range_params);
+ }
+
+ scan_node.set_scan_ranges(scan_ranges);
+ status = scan_node.open(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ bool eof = false;
+ vectorized::Block block;
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(2, block.rows());
+ EXPECT_EQ(6, block.columns());
+
+ auto columns = block.get_columns_with_type_and_name();
+ ASSERT_EQ(columns.size(), 6);
+ ASSERT_EQ(columns[0].to_string(0), "reference");
+ ASSERT_EQ(columns[0].to_string(1), "fiction");
+ ASSERT_EQ(columns[1].to_string(0), "NigelRees");
+ ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
+ ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
+ ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");
+
+ block.clear();
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ ASSERT_EQ(0, block.rows());
+ ASSERT_TRUE(eof);
+ scan_node.close(&_runtime_state);
+}
+
+TEST_F(VJsonScannerTest, use_jsonpaths_with_line_reader) {
+ VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+ scan_node.init(_tnode);
+ auto status = scan_node.prepare(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ std::vector<TScanRangeParams> scan_ranges;
+ {
+ TScanRangeParams scan_range_params;
+
+ TBrokerScanRange broker_scan_range;
+ broker_scan_range.params = _params;
+ TBrokerRangeDesc range;
+ range.start_offset = 0;
+ range.size = -1;
+ range.format_type = TFileFormatType::FORMAT_JSON;
+ range.splittable = true;
+ range.strip_outer_array = true;
+ range.__isset.strip_outer_array = true;
+ range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+ range.file_type = TFileType::FILE_LOCAL;
+ range.jsonpaths =
+ "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", "
+ "\"$.decimal\"]";
+ range.__isset.jsonpaths = true;
+ range.read_json_by_line = true;
+ range.__isset.read_json_by_line = true;
+ broker_scan_range.ranges.push_back(range);
+ scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+ scan_ranges.push_back(scan_range_params);
+ }
+
+ scan_node.set_scan_ranges(scan_ranges);
+ status = scan_node.open(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ bool eof = false;
+ vectorized::Block block;
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(2, block.rows());
+ EXPECT_EQ(6, block.columns());
+
+ auto columns = block.get_columns_with_type_and_name();
+ ASSERT_EQ(columns.size(), 6);
+ ASSERT_EQ(columns[0].to_string(0), "reference");
+ ASSERT_EQ(columns[0].to_string(1), "fiction");
+ ASSERT_EQ(columns[1].to_string(0), "NigelRees");
+ ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
+ ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
+ ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");
+
+ block.clear();
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ ASSERT_EQ(0, block.rows());
+ ASSERT_TRUE(eof);
+ scan_node.close(&_runtime_state);
+}
+
+TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) {
+ VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
+ scan_node.init(_tnode);
+ auto status = scan_node.prepare(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ // set scan range
+ std::vector<TScanRangeParams> scan_ranges;
+ {
+ TScanRangeParams scan_range_params;
+
+ TBrokerScanRange broker_scan_range;
+ broker_scan_range.params = _params;
+ TBrokerRangeDesc range;
+ range.start_offset = 0;
+ range.size = -1;
+ range.format_type = TFileFormatType::FORMAT_JSON;
+ range.strip_outer_array = true;
+ range.__isset.strip_outer_array = true;
+ range.splittable = true;
+ range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
+ range.file_type = TFileType::FILE_LOCAL;
+ range.jsonpaths = "[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\", \"$.k5\", \"$.k6\"]";
+ range.__isset.jsonpaths = true;
+ broker_scan_range.ranges.push_back(range);
+ scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
+ scan_ranges.push_back(scan_range_params);
+ }
+
+ scan_node.set_scan_ranges(scan_ranges);
+ status = scan_node.open(&_runtime_state);
+ EXPECT_TRUE(status.ok());
+
+ bool eof = false;
+ vectorized::Block block;
+ status = scan_node.get_next(&_runtime_state, &block, &eof);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(2, block.rows());
+ EXPECT_EQ(6, block.columns());
+
+ auto columns = block.get_columns_with_type_and_name();
+ ASSERT_EQ(columns.size(), 6);
+ ASSERT_EQ(columns[0].to_string(0), "\\N");
+ ASSERT_EQ(columns[0].to_string(1), "\\N");
+ ASSERT_EQ(columns[1].to_string(0), "\\N");
+ ASSERT_EQ(columns[1].to_string(1), "\\N");
+ ASSERT_EQ(columns[2].to_string(0), "\\N");
+ ASSERT_EQ(columns[2].to_string(1), "\\N");
+ block.clear();
+ scan_node.close(&_runtime_state);
+}
+
+} // namespace vectorized
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org