You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/10 01:56:45 UTC
[doris] branch master updated: [feature-wip](new-scan) Add new ES scanner and new ES scan node #13027
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 935ef5a598 [feature-wip](new-scan) Add new ES scanner and new ES scan node #13027
935ef5a598 is described below
commit 935ef5a59848f0b98c0c13d57f969ea3f60067f9
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Mon Oct 10 09:56:38 2022 +0800
[feature-wip](new-scan) Add new ES scanner and new ES scan node #13027
---
be/src/exec/exec_node.cpp | 10 +-
be/src/runtime/plan_fragment_executor.cpp | 4 +-
be/src/vec/CMakeLists.txt | 2 +
be/src/vec/exec/scan/new_es_scan_node.cpp | 245 ++++++++++++++++++++++++++++++
be/src/vec/exec/scan/new_es_scan_node.h | 70 +++++++++
be/src/vec/exec/scan/new_es_scanner.cpp | 200 ++++++++++++++++++++++++
be/src/vec/exec/scan/new_es_scanner.h | 67 ++++++++
be/src/vec/exec/scan/new_odbc_scanner.cpp | 2 +-
8 files changed, 596 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index a70db3f510..39e917ec6b 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -61,6 +61,7 @@
#include "vec/core/block.h"
#include "vec/exec/file_scan_node.h"
#include "vec/exec/join/vhash_join_node.h"
+#include "vec/exec/scan/new_es_scan_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exec/scan/new_jdbc_scan_node.h"
#include "vec/exec/scan/new_odbc_scan_node.h"
@@ -477,7 +478,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
case TPlanNodeType::ES_HTTP_SCAN_NODE:
if (state->enable_vectorized_exec()) {
- *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs));
+ if (config::enable_new_scan_node) {
+ *node = pool->add(new vectorized::NewEsScanNode(pool, tnode, descs));
+ } else {
+ *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs));
+ }
} else {
*node = pool->add(new EsHttpScanNode(pool, tnode, descs));
}
@@ -735,7 +740,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
ExecNode* child0 = agg_node[0]->_children[0];
if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
- typeid(*child0) == typeid(vectorized::NewOdbcScanNode)
+ typeid(*child0) == typeid(vectorized::NewOdbcScanNode) ||
+ typeid(*child0) == typeid(vectorized::NewEsScanNode)
#ifdef LIBJVM
|| typeid(*child0) == typeid(vectorized::NewJdbcScanNode)
#endif
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index cff6630c23..14baf8ae06 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -46,6 +46,7 @@
#include "util/telemetry/telemetry.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
+#include "vec/exec/scan/new_es_scan_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exec/scan/new_jdbc_scan_node.h"
#include "vec/exec/scan/new_odbc_scan_node.h"
@@ -171,7 +172,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
ExecNode* node = scan_nodes[i];
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
typeid(*node) == typeid(vectorized::NewFileScanNode) ||
- typeid(*node) == typeid(vectorized::NewOdbcScanNode)
+ typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
+ typeid(*node) == typeid(vectorized::NewEsScanNode)
#ifdef LIBJVM
|| typeid(*node) == typeid(vectorized::NewJdbcScanNode)
#endif
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 87900dcd0b..d91eec109f 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -257,6 +257,8 @@ set(VEC_FILES
exec/scan/new_odbc_scan_node.cpp
exec/scan/new_jdbc_scanner.cpp
exec/scan/new_jdbc_scan_node.cpp
+ exec/scan/new_es_scanner.cpp
+ exec/scan/new_es_scan_node.cpp
)
add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp
new file mode 100644
index 0000000000..31e439281d
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_es_scan_node.h"
+
+#include "exec/es/es_query_builder.h"
+#include "exec/es/es_scroll_query.h"
+#include "vec/exec/scan/new_es_scanner.h"
+#include "vec/utils/util.hpp"
+
+static const std::string NEW_SCAN_NODE_TYPE = "NewEsScanNode";
+
+// Prefer to the local host
+static std::string get_host_port(const std::vector<doris::TNetworkAddress>& es_hosts) {
+ std::string host_port;
+ std::string localhost = doris::BackendOptions::get_localhost();
+
+ doris::TNetworkAddress host = es_hosts[0];
+ for (auto& es_host : es_hosts) {
+ if (es_host.hostname == localhost) {
+ host = es_host;
+ break;
+ }
+ }
+
+ host_port = host.hostname;
+ host_port += ":";
+ host_port += std::to_string(host.port);
+ return host_port;
+}
+
+namespace doris::vectorized {
+
+NewEsScanNode::NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+ : VScanNode(pool, tnode, descs),
+ _tuple_id(tnode.es_scan_node.tuple_id),
+ _tuple_desc(nullptr),
+ _scanner_mem_tracker(nullptr),
+ _es_profile(nullptr) {
+ _output_tuple_id = tnode.es_scan_node.tuple_id;
+}
+
+std::string NewEsScanNode::get_name() {
+ return fmt::format("VNewEsScanNode");
+}
+
+Status NewEsScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(VScanNode::init(tnode, state));
+
+ // use TEsScanNode
+ _properties = tnode.es_scan_node.properties;
+
+ if (tnode.es_scan_node.__isset.docvalue_context) {
+ _docvalue_context = tnode.es_scan_node.docvalue_context;
+ }
+
+ if (tnode.es_scan_node.__isset.fields_context) {
+ _fields_context = tnode.es_scan_node.fields_context;
+ }
+ return Status::OK();
+}
+
+Status NewEsScanNode::prepare(RuntimeState* state) {
+ VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
+ RETURN_IF_ERROR(VScanNode::prepare(state));
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ _scanner_mem_tracker = std::make_unique<MemTracker>("NewEsScanner");
+
+ _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+ if (_tuple_desc == nullptr) {
+ return Status::InternalError("Failed to get tuple descriptor, _tuple_id=i{}", _tuple_id);
+ }
+
+ // set up column name vector for ESScrollQueryBuilder
+ for (auto slot_desc : _tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ _column_names.push_back(slot_desc->col_name());
+ }
+
+ return Status::OK();
+}
+
+void NewEsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
+ for (auto& es_scan_range : scan_ranges) {
+ DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
+ _scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range));
+ }
+}
+
+Status NewEsScanNode::_init_profile() {
+ RETURN_IF_ERROR(VScanNode::_init_profile());
+ _es_profile.reset(new RuntimeProfile("EsIterator"));
+ _scanner_profile->add_child(_es_profile.get(), true, nullptr);
+
+ _rows_read_counter = ADD_COUNTER(_es_profile, "RowsRead", TUnit::UNIT);
+ _read_timer = ADD_TIMER(_es_profile, "TotalRawReadTime(*)");
+ _materialize_timer = ADD_TIMER(_es_profile, "MaterializeTupleTime(*)");
+ return Status::OK();
+}
+
+Status NewEsScanNode::_process_conjuncts() {
+ RETURN_IF_ERROR(VScanNode::_process_conjuncts());
+ if (_eos) {
+ return Status::OK();
+ }
+
+ // fe by enable_new_es_dsl to control whether to generate DSL for easy rollback. After the code is stable, can delete the be generation logic
+ if (_properties.find(ESScanReader::KEY_QUERY_DSL) != _properties.end()) {
+ return Status::OK();
+ }
+
+ // if conjunct is constant, compute direct and set eos = true
+ for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
+ if (_conjunct_ctxs[conj_idx]->root()->is_constant()) {
+ void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr);
+ if (value == nullptr || *reinterpret_cast<bool*>(value) == false) {
+ _eos = true;
+ }
+ }
+ }
+ RETURN_IF_ERROR(build_conjuncts_list());
+ // remove those predicates which ES cannot support
+ std::vector<bool> list;
+ BooleanQueryBuilder::validate(_predicates, &list);
+
+ DCHECK(list.size() == _predicate_to_conjunct.size());
+ for (int i = list.size() - 1; i >= 0; i--) {
+ if (!list[i]) {
+ _predicate_to_conjunct.erase(_predicate_to_conjunct.begin() + i);
+ _predicates.erase(_predicates.begin() + i);
+ }
+ }
+
+ // filter the conjuncts and ES will process them later
+ for (int i = _predicate_to_conjunct.size() - 1; i >= 0; i--) {
+ int conjunct_index = _predicate_to_conjunct[i];
+ _conjunct_ctxs[conjunct_index]->close(_state);
+ _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index);
+ }
+
+ auto checker = [&](int index) {
+ return _conjunct_to_predicate[index] != -1 && list[_conjunct_to_predicate[index]];
+ };
+
+ // _peel_pushed_vconjunct
+ if (_vconjunct_ctx_ptr == nullptr) {
+ return Status::OK();
+ }
+ int leaf_index = 0;
+ vectorized::VExpr* conjunct_expr_root = (*_vconjunct_ctx_ptr)->root();
+ if (conjunct_expr_root != nullptr) {
+ vectorized::VExpr* new_conjunct_expr_root = vectorized::VectorizedUtils::dfs_peel_conjunct(
+ _state, *_vconjunct_ctx_ptr, conjunct_expr_root, leaf_index, checker);
+ if (new_conjunct_expr_root == nullptr) {
+ (*_vconjunct_ctx_ptr)->close(_state);
+ _vconjunct_ctx_ptr.reset(nullptr);
+ } else {
+ (*_vconjunct_ctx_ptr)->set_root(new_conjunct_expr_root);
+ }
+ }
+ return Status::OK();
+}
+
+Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+ if (_scan_ranges.empty()) {
+ _eos = true;
+ return Status::OK();
+ }
+
+ for (auto& es_scan_range : _scan_ranges) {
+ // Collect the information from scan range to properties
+ std::map<std::string, std::string> properties(_properties);
+ properties[ESScanReader::KEY_INDEX] = es_scan_range->index;
+ if (es_scan_range->__isset.type) {
+ properties[ESScanReader::KEY_TYPE] = es_scan_range->type;
+ }
+ properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range->shard_id);
+ properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_state->batch_size());
+ properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range->es_hosts);
+ // push down limit to Elasticsearch
+ // if predicate in _conjunct_ctxs can not be processed by Elasticsearch, we can not push down limit operator to Elasticsearch
+ if (limit() != -1 && limit() <= _state->batch_size() && _conjunct_ctxs.empty()) {
+ properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(limit());
+ }
+
+ bool doc_value_mode = false;
+ properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(
+ properties, _column_names, _predicates, _docvalue_context, &doc_value_mode);
+
+ NewEsScanner* scanner =
+ new NewEsScanner(_state, this, _limit_per_scanner, _mem_tracker.get(), _tuple_id,
+ properties, _docvalue_context, doc_value_mode);
+
+ _scanner_pool.add(scanner);
+ RETURN_IF_ERROR(scanner->prepare(_state));
+ scanners->push_back(static_cast<VScanner*>(scanner));
+ }
+ return Status::OK();
+}
+
+// build predicate
+Status NewEsScanNode::build_conjuncts_list() {
+ Status status = Status::OK();
+ _conjunct_to_predicate.resize(_conjunct_ctxs.size());
+
+ for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
+ EsPredicate* predicate = _pool->add(new EsPredicate(_conjunct_ctxs[i], _tuple_desc, _pool));
+ predicate->set_field_context(_fields_context);
+ status = predicate->build_disjuncts_list();
+ if (status.ok()) {
+ _conjunct_to_predicate[i] = _predicate_to_conjunct.size();
+ _predicate_to_conjunct.push_back(i);
+
+ _predicates.push_back(predicate);
+ } else {
+ _conjunct_to_predicate[i] = -1;
+
+ VLOG_CRITICAL << status.get_error_msg();
+ status = predicate->get_es_query_status();
+ if (!status.ok()) {
+ LOG(WARNING) << status.get_error_msg();
+ return status;
+ }
+ }
+ }
+
+ return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_es_scan_node.h b/be/src/vec/exec/scan/new_es_scan_node.h
new file mode 100644
index 0000000000..88586f7383
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scan_node.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/es/es_predicate.h"
+#include "vec/exec/scan/new_es_scanner.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris::vectorized {
+
+class NewEsScanNode : public VScanNode {
+public:
+ friend class NewEsScanner;
+
+public:
+ NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+ ~NewEsScanNode() override = default;
+
+ std::string get_name() override;
+ Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+ Status prepare(RuntimeState* state) override;
+ void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
+
+protected:
+ Status _init_profile() override;
+ Status _process_conjuncts() override;
+ Status _init_scanners(std::list<VScanner*>* scanners) override;
+
+private:
+ Status build_conjuncts_list();
+
+private:
+ TupleId _tuple_id;
+ TupleDescriptor* _tuple_desc;
+
+ std::map<std::string, std::string> _properties;
+ std::map<std::string, std::string> _fields_context;
+ std::map<std::string, std::string> _docvalue_context;
+
+ std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges;
+ std::vector<std::string> _column_names;
+
+ std::vector<EsPredicate*> _predicates;
+ std::vector<int> _predicate_to_conjunct;
+ std::vector<int> _conjunct_to_predicate;
+
+ std::unique_ptr<MemTracker> _scanner_mem_tracker;
+
+ // Profile
+ std::unique_ptr<RuntimeProfile> _es_profile;
+ RuntimeProfile::Counter* _rows_read_counter;
+ RuntimeProfile::Counter* _read_timer;
+ RuntimeProfile::Counter* _materialize_timer;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp
new file mode 100644
index 0000000000..03f4526a23
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_es_scanner.h"
+
+#include "vec/exec/scan/new_es_scan_node.h"
+
+static const std::string NEW_SCANNER_TYPE = "NewEsScanner";
+
+namespace doris::vectorized {
+
+NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit,
+ MemTracker* mem_tracker, TupleId tuple_id,
+ const std::map<std::string, std::string>& properties,
+ const std::map<std::string, std::string>& docvalue_context,
+ bool doc_value_mode)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+ _is_init(false),
+ _es_eof(false),
+ _properties(properties),
+ _line_eof(false),
+ _batch_eof(false),
+ _tuple_id(tuple_id),
+ _tuple_desc(nullptr),
+ _mem_pool(nullptr),
+ _es_reader(nullptr),
+ _es_scroll_parser(nullptr),
+ _docvalue_context(docvalue_context),
+ _doc_value_mode(doc_value_mode) {}
+
+Status NewEsScanner::prepare(RuntimeState* state) {
+ VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
+
+ if (_is_init) {
+ return Status::OK();
+ }
+
+ if (nullptr == state) {
+ return Status::InternalError("input pointer is null.");
+ }
+
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+ _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+ if (nullptr == _tuple_desc) {
+ return Status::InternalError("Failed to get tuple descriptor, tuple_id={}", _tuple_id);
+ }
+
+ const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT);
+ _es_reader.reset(new ESScanReader(host, _properties, _doc_value_mode));
+ if (_es_reader == nullptr) {
+ return Status::InternalError("Es reader construct failed.");
+ }
+
+ _is_init = true;
+ return Status::OK();
+}
+
+Status NewEsScanner::open(RuntimeState* state) {
+ VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open";
+
+ if (nullptr == state) {
+ return Status::InternalError("input pointer is null.");
+ }
+
+ if (!_is_init) {
+ return Status::InternalError("used before initialize.");
+ }
+
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(VScanner::open(state));
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+ RETURN_IF_ERROR(_es_reader->open());
+ _mem_pool.reset(new MemPool(_mem_tracker));
+
+ return Status::OK();
+}
+
+Status NewEsScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
+ VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl";
+ if (nullptr == state || nullptr == block || nullptr == eof) {
+ return Status::InternalError("input is NULL pointer");
+ }
+
+ if (!_is_init) {
+ return Status::InternalError("used before initialize.");
+ }
+
+ RETURN_IF_CANCELLED(state);
+
+ if (_es_eof == true) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ auto column_size = _tuple_desc->slots().size();
+ std::vector<MutableColumnPtr> columns(column_size);
+
+ bool mem_reuse = block->mem_reuse();
+ const int batch_size = state->batch_size();
+ // only empty block should be here
+ DCHECK(block->rows() == 0);
+
+ do {
+ columns.resize(column_size);
+ for (auto i = 0; i < column_size; i++) {
+ if (mem_reuse) {
+ columns[i] = std::move(*block->get_by_position(i).column).mutate();
+ } else {
+ columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+ }
+ }
+
+ while (columns[0]->size() < batch_size && !_es_eof) {
+ RETURN_IF_CANCELLED(state);
+ // Get from scanner
+ RETURN_IF_ERROR(_get_next(columns));
+ }
+
+ if (_es_eof == true) {
+ if (block->rows() == 0) {
+ *eof = true;
+ }
+ break;
+ }
+
+ // Before really use the Block, must clear other ptr of column in block
+ // So here need do std::move and clear in `columns`
+ if (!mem_reuse) {
+ int column_index = 0;
+ for (const auto slot_desc : _tuple_desc->slots()) {
+ block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+ } else {
+ columns.clear();
+ }
+ VLOG_ROW << "NewEsScanner output rows: " << block->rows();
+ } while (block->rows() == 0 && !(*eof));
+ return Status::OK();
+}
+
+Status NewEsScanner::_get_next(std::vector<vectorized::MutableColumnPtr>& columns) {
+ NewEsScanNode* new_es_scan_node = static_cast<NewEsScanNode*>(_parent);
+ SCOPED_TIMER(new_es_scan_node->_read_timer);
+ if (_line_eof && _batch_eof) {
+ _es_eof = true;
+ return Status::OK();
+ }
+
+ while (!_batch_eof) {
+ if (_line_eof || _es_scroll_parser == nullptr) {
+ RETURN_IF_ERROR(_es_reader->get_next(&_batch_eof, _es_scroll_parser));
+ if (_batch_eof) {
+ _es_eof = true;
+ return Status::OK();
+ }
+ }
+
+ COUNTER_UPDATE(new_es_scan_node->_rows_read_counter, 1);
+ SCOPED_TIMER(new_es_scan_node->_materialize_timer);
+ RETURN_IF_ERROR(_es_scroll_parser->fill_columns(_tuple_desc, columns, _mem_pool.get(),
+ &_line_eof, _docvalue_context));
+ if (!_line_eof) {
+ break;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status NewEsScanner::close(RuntimeState* state) {
+ if (_is_closed) {
+ return Status::OK();
+ }
+
+ if (_es_reader != nullptr) {
+ _es_reader->close();
+ }
+
+ RETURN_IF_ERROR(VScanner::close(state));
+ return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h
new file mode 100644
index 0000000000..2e776f08c9
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scanner.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/es/es_scan_reader.h"
+#include "exec/es/es_scroll_parser.h"
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris::vectorized {
+
+class NewEsScanNode;
+
+class NewEsScanner : public VScanner {
+public:
+ NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, MemTracker* mem_tracker,
+ TupleId tuple_id, const std::map<std::string, std::string>& properties,
+ const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode);
+
+ Status open(RuntimeState* state) override;
+ Status close(RuntimeState* state) override;
+
+public:
+ Status prepare(RuntimeState* state);
+
+protected:
+ Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
+
+private:
+ Status _get_next(std::vector<vectorized::MutableColumnPtr>& columns);
+
+private:
+ bool _is_init;
+ bool _es_eof;
+
+ const std::map<std::string, std::string>& _properties;
+
+ bool _line_eof;
+ bool _batch_eof;
+
+ TupleId _tuple_id;
+ const TupleDescriptor* _tuple_desc;
+
+ std::unique_ptr<MemPool> _mem_pool;
+
+ std::unique_ptr<ESScanReader> _es_reader;
+ std::unique_ptr<ScrollParser> _es_scroll_parser;
+
+ const std::map<std::string, std::string>& _docvalue_context;
+ bool _doc_value_mode;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp
index 118c35b76c..ee383ac636 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -200,7 +200,7 @@ Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
} else {
columns.clear();
}
- VLOG_ROW << "VOdbcScanNode output rows: " << block->rows();
+ VLOG_ROW << "NewOdbcScanner output rows: " << block->rows();
} while (block->rows() == 0 && !(*eof));
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org