You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/10 01:56:45 UTC

[doris] branch master updated: [feature-wip](new-scan) Add new ES scanner and new ES scan node #13027

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 935ef5a598 [feature-wip](new-scan) Add new ES scanner and new ES scan node #13027
935ef5a598 is described below

commit 935ef5a59848f0b98c0c13d57f969ea3f60067f9
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Mon Oct 10 09:56:38 2022 +0800

    [feature-wip](new-scan) Add new ES scanner and new ES scan node #13027
---
 be/src/exec/exec_node.cpp                 |  10 +-
 be/src/runtime/plan_fragment_executor.cpp |   4 +-
 be/src/vec/CMakeLists.txt                 |   2 +
 be/src/vec/exec/scan/new_es_scan_node.cpp | 245 ++++++++++++++++++++++++++++++
 be/src/vec/exec/scan/new_es_scan_node.h   |  70 +++++++++
 be/src/vec/exec/scan/new_es_scanner.cpp   | 200 ++++++++++++++++++++++++
 be/src/vec/exec/scan/new_es_scanner.h     |  67 ++++++++
 be/src/vec/exec/scan/new_odbc_scanner.cpp |   2 +-
 8 files changed, 596 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index a70db3f510..39e917ec6b 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -61,6 +61,7 @@
 #include "vec/core/block.h"
 #include "vec/exec/file_scan_node.h"
 #include "vec/exec/join/vhash_join_node.h"
+#include "vec/exec/scan/new_es_scan_node.h"
 #include "vec/exec/scan/new_file_scan_node.h"
 #include "vec/exec/scan/new_jdbc_scan_node.h"
 #include "vec/exec/scan/new_odbc_scan_node.h"
@@ -477,7 +478,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
 
     case TPlanNodeType::ES_HTTP_SCAN_NODE:
         if (state->enable_vectorized_exec()) {
-            *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs));
+            if (config::enable_new_scan_node) {
+                *node = pool->add(new vectorized::NewEsScanNode(pool, tnode, descs));
+            } else {
+                *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs));
+            }
         } else {
             *node = pool->add(new EsHttpScanNode(pool, tnode, descs));
         }
@@ -735,7 +740,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
     ExecNode* child0 = agg_node[0]->_children[0];
     if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
         typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
-        typeid(*child0) == typeid(vectorized::NewOdbcScanNode)
+        typeid(*child0) == typeid(vectorized::NewOdbcScanNode) ||
+        typeid(*child0) == typeid(vectorized::NewEsScanNode)
 #ifdef LIBJVM
         || typeid(*child0) == typeid(vectorized::NewJdbcScanNode)
 #endif
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index cff6630c23..14baf8ae06 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -46,6 +46,7 @@
 #include "util/telemetry/telemetry.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
+#include "vec/exec/scan/new_es_scan_node.h"
 #include "vec/exec/scan/new_file_scan_node.h"
 #include "vec/exec/scan/new_jdbc_scan_node.h"
 #include "vec/exec/scan/new_odbc_scan_node.h"
@@ -171,7 +172,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
         ExecNode* node = scan_nodes[i];
         if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
             typeid(*node) == typeid(vectorized::NewFileScanNode) ||
-            typeid(*node) == typeid(vectorized::NewOdbcScanNode)
+            typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
+            typeid(*node) == typeid(vectorized::NewEsScanNode)
 #ifdef LIBJVM
             || typeid(*node) == typeid(vectorized::NewJdbcScanNode)
 #endif
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 87900dcd0b..d91eec109f 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -257,6 +257,8 @@ set(VEC_FILES
   exec/scan/new_odbc_scan_node.cpp
   exec/scan/new_jdbc_scanner.cpp
   exec/scan/new_jdbc_scan_node.cpp
+  exec/scan/new_es_scanner.cpp
+  exec/scan/new_es_scan_node.cpp
   )
 
 add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp
new file mode 100644
index 0000000000..31e439281d
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_es_scan_node.h"
+
+#include "exec/es/es_query_builder.h"
+#include "exec/es/es_scroll_query.h"
+#include "vec/exec/scan/new_es_scanner.h"
+#include "vec/utils/util.hpp"
+
+static const std::string NEW_SCAN_NODE_TYPE = "NewEsScanNode";
+
+// Prefer to the local host
+static std::string get_host_port(const std::vector<doris::TNetworkAddress>& es_hosts) {
+    std::string host_port;
+    std::string localhost = doris::BackendOptions::get_localhost();
+
+    doris::TNetworkAddress host = es_hosts[0];
+    for (auto& es_host : es_hosts) {
+        if (es_host.hostname == localhost) {
+            host = es_host;
+            break;
+        }
+    }
+
+    host_port = host.hostname;
+    host_port += ":";
+    host_port += std::to_string(host.port);
+    return host_port;
+}
+
+namespace doris::vectorized {
+
+NewEsScanNode::NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+        : VScanNode(pool, tnode, descs),
+          _tuple_id(tnode.es_scan_node.tuple_id),
+          _tuple_desc(nullptr),
+          _scanner_mem_tracker(nullptr),
+          _es_profile(nullptr) {
+    _output_tuple_id = tnode.es_scan_node.tuple_id;
+}
+
+std::string NewEsScanNode::get_name() {
+    return fmt::format("VNewEsScanNode");
+}
+
+Status NewEsScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(VScanNode::init(tnode, state));
+
+    // use TEsScanNode
+    _properties = tnode.es_scan_node.properties;
+
+    if (tnode.es_scan_node.__isset.docvalue_context) {
+        _docvalue_context = tnode.es_scan_node.docvalue_context;
+    }
+
+    if (tnode.es_scan_node.__isset.fields_context) {
+        _fields_context = tnode.es_scan_node.fields_context;
+    }
+    return Status::OK();
+}
+
+Status NewEsScanNode::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
+    RETURN_IF_ERROR(VScanNode::prepare(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    _scanner_mem_tracker = std::make_unique<MemTracker>("NewEsScanner");
+
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (_tuple_desc == nullptr) {
+        return Status::InternalError("Failed to get tuple descriptor, _tuple_id=i{}", _tuple_id);
+    }
+
+    // set up column name vector for ESScrollQueryBuilder
+    for (auto slot_desc : _tuple_desc->slots()) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        _column_names.push_back(slot_desc->col_name());
+    }
+
+    return Status::OK();
+}
+
+void NewEsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
+    for (auto& es_scan_range : scan_ranges) {
+        DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
+        _scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range));
+    }
+}
+
+Status NewEsScanNode::_init_profile() {
+    RETURN_IF_ERROR(VScanNode::_init_profile());
+    _es_profile.reset(new RuntimeProfile("EsIterator"));
+    _scanner_profile->add_child(_es_profile.get(), true, nullptr);
+
+    _rows_read_counter = ADD_COUNTER(_es_profile, "RowsRead", TUnit::UNIT);
+    _read_timer = ADD_TIMER(_es_profile, "TotalRawReadTime(*)");
+    _materialize_timer = ADD_TIMER(_es_profile, "MaterializeTupleTime(*)");
+    return Status::OK();
+}
+
+Status NewEsScanNode::_process_conjuncts() {
+    RETURN_IF_ERROR(VScanNode::_process_conjuncts());
+    if (_eos) {
+        return Status::OK();
+    }
+
+    // fe by enable_new_es_dsl to control whether to generate DSL for easy rollback. After the code is stable, can delete the be generation logic
+    if (_properties.find(ESScanReader::KEY_QUERY_DSL) != _properties.end()) {
+        return Status::OK();
+    }
+
+    // if conjunct is constant, compute direct and set eos = true
+    for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
+        if (_conjunct_ctxs[conj_idx]->root()->is_constant()) {
+            void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr);
+            if (value == nullptr || *reinterpret_cast<bool*>(value) == false) {
+                _eos = true;
+            }
+        }
+    }
+    RETURN_IF_ERROR(build_conjuncts_list());
+    // remove those predicates which ES cannot support
+    std::vector<bool> list;
+    BooleanQueryBuilder::validate(_predicates, &list);
+
+    DCHECK(list.size() == _predicate_to_conjunct.size());
+    for (int i = list.size() - 1; i >= 0; i--) {
+        if (!list[i]) {
+            _predicate_to_conjunct.erase(_predicate_to_conjunct.begin() + i);
+            _predicates.erase(_predicates.begin() + i);
+        }
+    }
+
+    // filter the conjuncts and ES will process them later
+    for (int i = _predicate_to_conjunct.size() - 1; i >= 0; i--) {
+        int conjunct_index = _predicate_to_conjunct[i];
+        _conjunct_ctxs[conjunct_index]->close(_state);
+        _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index);
+    }
+
+    auto checker = [&](int index) {
+        return _conjunct_to_predicate[index] != -1 && list[_conjunct_to_predicate[index]];
+    };
+
+    // _peel_pushed_vconjunct
+    if (_vconjunct_ctx_ptr == nullptr) {
+        return Status::OK();
+    }
+    int leaf_index = 0;
+    vectorized::VExpr* conjunct_expr_root = (*_vconjunct_ctx_ptr)->root();
+    if (conjunct_expr_root != nullptr) {
+        vectorized::VExpr* new_conjunct_expr_root = vectorized::VectorizedUtils::dfs_peel_conjunct(
+                _state, *_vconjunct_ctx_ptr, conjunct_expr_root, leaf_index, checker);
+        if (new_conjunct_expr_root == nullptr) {
+            (*_vconjunct_ctx_ptr)->close(_state);
+            _vconjunct_ctx_ptr.reset(nullptr);
+        } else {
+            (*_vconjunct_ctx_ptr)->set_root(new_conjunct_expr_root);
+        }
+    }
+    return Status::OK();
+}
+
+Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+    if (_scan_ranges.empty()) {
+        _eos = true;
+        return Status::OK();
+    }
+
+    for (auto& es_scan_range : _scan_ranges) {
+        // Collect the information from scan range to properties
+        std::map<std::string, std::string> properties(_properties);
+        properties[ESScanReader::KEY_INDEX] = es_scan_range->index;
+        if (es_scan_range->__isset.type) {
+            properties[ESScanReader::KEY_TYPE] = es_scan_range->type;
+        }
+        properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range->shard_id);
+        properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_state->batch_size());
+        properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range->es_hosts);
+        // push down limit to Elasticsearch
+        // if predicate in _conjunct_ctxs can not be processed by Elasticsearch, we can not push down limit operator to Elasticsearch
+        if (limit() != -1 && limit() <= _state->batch_size() && _conjunct_ctxs.empty()) {
+            properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(limit());
+        }
+
+        bool doc_value_mode = false;
+        properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(
+                properties, _column_names, _predicates, _docvalue_context, &doc_value_mode);
+
+        NewEsScanner* scanner =
+                new NewEsScanner(_state, this, _limit_per_scanner, _mem_tracker.get(), _tuple_id,
+                                 properties, _docvalue_context, doc_value_mode);
+
+        _scanner_pool.add(scanner);
+        RETURN_IF_ERROR(scanner->prepare(_state));
+        scanners->push_back(static_cast<VScanner*>(scanner));
+    }
+    return Status::OK();
+}
+
+// build predicate
+Status NewEsScanNode::build_conjuncts_list() {
+    Status status = Status::OK();
+    _conjunct_to_predicate.resize(_conjunct_ctxs.size());
+
+    for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
+        EsPredicate* predicate = _pool->add(new EsPredicate(_conjunct_ctxs[i], _tuple_desc, _pool));
+        predicate->set_field_context(_fields_context);
+        status = predicate->build_disjuncts_list();
+        if (status.ok()) {
+            _conjunct_to_predicate[i] = _predicate_to_conjunct.size();
+            _predicate_to_conjunct.push_back(i);
+
+            _predicates.push_back(predicate);
+        } else {
+            _conjunct_to_predicate[i] = -1;
+
+            VLOG_CRITICAL << status.get_error_msg();
+            status = predicate->get_es_query_status();
+            if (!status.ok()) {
+                LOG(WARNING) << status.get_error_msg();
+                return status;
+            }
+        }
+    }
+
+    return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_es_scan_node.h b/be/src/vec/exec/scan/new_es_scan_node.h
new file mode 100644
index 0000000000..88586f7383
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scan_node.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/es/es_predicate.h"
+#include "vec/exec/scan/new_es_scanner.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris::vectorized {
+
+class NewEsScanNode : public VScanNode {
+public:
+    friend class NewEsScanner;
+
+public:
+    NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+    ~NewEsScanNode() override = default;
+
+    std::string get_name() override;
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+    Status prepare(RuntimeState* state) override;
+    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
+
+protected:
+    Status _init_profile() override;
+    Status _process_conjuncts() override;
+    Status _init_scanners(std::list<VScanner*>* scanners) override;
+
+private:
+    Status build_conjuncts_list();
+
+private:
+    TupleId _tuple_id;
+    TupleDescriptor* _tuple_desc;
+
+    std::map<std::string, std::string> _properties;
+    std::map<std::string, std::string> _fields_context;
+    std::map<std::string, std::string> _docvalue_context;
+
+    std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges;
+    std::vector<std::string> _column_names;
+
+    std::vector<EsPredicate*> _predicates;
+    std::vector<int> _predicate_to_conjunct;
+    std::vector<int> _conjunct_to_predicate;
+
+    std::unique_ptr<MemTracker> _scanner_mem_tracker;
+
+    // Profile
+    std::unique_ptr<RuntimeProfile> _es_profile;
+    RuntimeProfile::Counter* _rows_read_counter;
+    RuntimeProfile::Counter* _read_timer;
+    RuntimeProfile::Counter* _materialize_timer;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp
new file mode 100644
index 0000000000..03f4526a23
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_es_scanner.h"
+
+#include "vec/exec/scan/new_es_scan_node.h"
+
+static const std::string NEW_SCANNER_TYPE = "NewEsScanner";
+
+namespace doris::vectorized {
+
+NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit,
+                           MemTracker* mem_tracker, TupleId tuple_id,
+                           const std::map<std::string, std::string>& properties,
+                           const std::map<std::string, std::string>& docvalue_context,
+                           bool doc_value_mode)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+          _is_init(false),
+          _es_eof(false),
+          _properties(properties),
+          _line_eof(false),
+          _batch_eof(false),
+          _tuple_id(tuple_id),
+          _tuple_desc(nullptr),
+          _mem_pool(nullptr),
+          _es_reader(nullptr),
+          _es_scroll_parser(nullptr),
+          _docvalue_context(docvalue_context),
+          _doc_value_mode(doc_value_mode) {}
+
+Status NewEsScanner::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
+
+    if (_is_init) {
+        return Status::OK();
+    }
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (nullptr == _tuple_desc) {
+        return Status::InternalError("Failed to get tuple descriptor, tuple_id={}", _tuple_id);
+    }
+
+    const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT);
+    _es_reader.reset(new ESScanReader(host, _properties, _doc_value_mode));
+    if (_es_reader == nullptr) {
+        return Status::InternalError("Es reader construct failed.");
+    }
+
+    _is_init = true;
+    return Status::OK();
+}
+
+Status NewEsScanner::open(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open";
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(VScanner::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+    RETURN_IF_ERROR(_es_reader->open());
+    _mem_pool.reset(new MemPool(_mem_tracker));
+
+    return Status::OK();
+}
+
+Status NewEsScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl";
+    if (nullptr == state || nullptr == block || nullptr == eof) {
+        return Status::InternalError("input is NULL pointer");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+
+    RETURN_IF_CANCELLED(state);
+
+    if (_es_eof == true) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    auto column_size = _tuple_desc->slots().size();
+    std::vector<MutableColumnPtr> columns(column_size);
+
+    bool mem_reuse = block->mem_reuse();
+    const int batch_size = state->batch_size();
+    // only empty block should be here
+    DCHECK(block->rows() == 0);
+
+    do {
+        columns.resize(column_size);
+        for (auto i = 0; i < column_size; i++) {
+            if (mem_reuse) {
+                columns[i] = std::move(*block->get_by_position(i).column).mutate();
+            } else {
+                columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+            }
+        }
+
+        while (columns[0]->size() < batch_size && !_es_eof) {
+            RETURN_IF_CANCELLED(state);
+            // Get from scanner
+            RETURN_IF_ERROR(_get_next(columns));
+        }
+
+        if (_es_eof == true) {
+            if (block->rows() == 0) {
+                *eof = true;
+            }
+            break;
+        }
+
+        // Before really use the Block, must clear other ptr of column in block
+        // So here need do std::move and clear in `columns`
+        if (!mem_reuse) {
+            int column_index = 0;
+            for (const auto slot_desc : _tuple_desc->slots()) {
+                block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
+                                                    slot_desc->get_data_type_ptr(),
+                                                    slot_desc->col_name()));
+            }
+        } else {
+            columns.clear();
+        }
+        VLOG_ROW << "NewEsScanner output rows: " << block->rows();
+    } while (block->rows() == 0 && !(*eof));
+    return Status::OK();
+}
+
+Status NewEsScanner::_get_next(std::vector<vectorized::MutableColumnPtr>& columns) {
+    NewEsScanNode* new_es_scan_node = static_cast<NewEsScanNode*>(_parent);
+    SCOPED_TIMER(new_es_scan_node->_read_timer);
+    if (_line_eof && _batch_eof) {
+        _es_eof = true;
+        return Status::OK();
+    }
+
+    while (!_batch_eof) {
+        if (_line_eof || _es_scroll_parser == nullptr) {
+            RETURN_IF_ERROR(_es_reader->get_next(&_batch_eof, _es_scroll_parser));
+            if (_batch_eof) {
+                _es_eof = true;
+                return Status::OK();
+            }
+        }
+
+        COUNTER_UPDATE(new_es_scan_node->_rows_read_counter, 1);
+        SCOPED_TIMER(new_es_scan_node->_materialize_timer);
+        RETURN_IF_ERROR(_es_scroll_parser->fill_columns(_tuple_desc, columns, _mem_pool.get(),
+                                                        &_line_eof, _docvalue_context));
+        if (!_line_eof) {
+            break;
+        }
+    }
+
+    return Status::OK();
+}
+
+Status NewEsScanner::close(RuntimeState* state) {
+    if (_is_closed) {
+        return Status::OK();
+    }
+
+    if (_es_reader != nullptr) {
+        _es_reader->close();
+    }
+
+    RETURN_IF_ERROR(VScanner::close(state));
+    return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h
new file mode 100644
index 0000000000..2e776f08c9
--- /dev/null
+++ b/be/src/vec/exec/scan/new_es_scanner.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/es/es_scan_reader.h"
+#include "exec/es/es_scroll_parser.h"
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris::vectorized {
+
+class NewEsScanNode;
+
+class NewEsScanner : public VScanner {
+public:
+    NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, MemTracker* mem_tracker,
+                 TupleId tuple_id, const std::map<std::string, std::string>& properties,
+                 const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode);
+
+    Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state) override;
+
+public:
+    Status prepare(RuntimeState* state);
+
+protected:
+    Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
+
+private:
+    Status _get_next(std::vector<vectorized::MutableColumnPtr>& columns);
+
+private:
+    bool _is_init;
+    bool _es_eof;
+
+    const std::map<std::string, std::string>& _properties;
+
+    bool _line_eof;
+    bool _batch_eof;
+
+    TupleId _tuple_id;
+    const TupleDescriptor* _tuple_desc;
+
+    std::unique_ptr<MemPool> _mem_pool;
+
+    std::unique_ptr<ESScanReader> _es_reader;
+    std::unique_ptr<ScrollParser> _es_scroll_parser;
+
+    const std::map<std::string, std::string>& _docvalue_context;
+    bool _doc_value_mode;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp
index 118c35b76c..ee383ac636 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -200,7 +200,7 @@ Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
         } else {
             columns.clear();
         }
-        VLOG_ROW << "VOdbcScanNode output rows: " << block->rows();
+        VLOG_ROW << "NewOdbcScanner output rows: " << block->rows();
     } while (block->rows() == 0 && !(*eof));
 
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org