You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2022/12/12 16:28:42 UTC

[doris] branch master updated: [Pipeline](load) support pipeline broker load (#14940)

This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c25a7235f9 [Pipeline](load) support pipeline broker load (#14940)
c25a7235f9 is described below

commit c25a7235f9b07acdd0637d5c7145ee5a5c81f614
Author: Pxl <px...@qq.com>
AuthorDate: Tue Dec 13 00:28:36 2022 +0800

    [Pipeline](load) support pipeline broker load (#14940)
    
    support pipeline broker load
---
 be/src/exec/exec_node.cpp                          |  8 ++-
 be/src/exec/exec_node.h                            |  6 +--
 be/src/pipeline/exec/broker_scan_operator.h        | 59 ++++++++++++++++++++++
 be/src/pipeline/exec/operator.h                    |  1 +
 be/src/pipeline/pipeline.h                         | 11 ++--
 be/src/pipeline/pipeline_fragment_context.cpp      | 37 +++++++++-----
 be/src/pipeline/pipeline_fragment_context.h        | 13 ++---
 be/src/pipeline/pipeline_task.cpp                  |  5 +-
 be/src/pipeline/pipeline_task.h                    |  1 -
 be/src/runtime/fragment_mgr.cpp                    |  8 +--
 .../runtime/stream_load/stream_load_executor.cpp   |  2 -
 be/src/vec/exec/scan/new_file_scan_node.cpp        |  3 --
 be/src/vec/exec/scan/new_file_scan_node.h          |  1 -
 be/src/vec/exec/vbroker_scan_node.h                |  6 ++-
 be/src/vec/exec/vbroker_scanner.cpp                |  7 +--
 15 files changed, 108 insertions(+), 60 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 5849b455cb..939954dd01 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -38,7 +38,6 @@
 #include "exec/hash_join_node.h"
 #include "exec/intersect_node.h"
 #include "exec/merge_node.h"
-#include "exec/mysql_scan_node.h"
 #include "exec/odbc_scan_node.h"
 #include "exec/olap_scan_node.h"
 #include "exec/partitioned_aggregation_node.h"
@@ -122,7 +121,7 @@ int ExecNode::RowBatchQueue::Cleanup() {
     //   delete batch;
     // }
 
-    lock_guard<std::mutex> l(lock_);
+    std::lock_guard<std::mutex> l(lock_);
     for (std::list<RowBatch*>::iterator it = cleanup_queue_.begin(); it != cleanup_queue_.end();
          ++it) {
         // num_io_buffers += (*it)->num_io_buffers();
@@ -144,7 +143,6 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
           _rows_returned_counter(nullptr),
           _rows_returned_rate(nullptr),
           _memory_used_counter(nullptr),
-          _get_next_span(),
           _is_closed(false),
           _ref(0) {
     if (tnode.__isset.output_tuple_id) {
@@ -321,7 +319,7 @@ Status ExecNode::close(RuntimeState* state) {
 }
 
 void ExecNode::add_runtime_exec_option(const std::string& str) {
-    lock_guard<mutex> l(_exec_options_lock);
+    std::lock_guard<std::mutex> l(_exec_options_lock);
 
     if (_runtime_exec_options.empty()) {
         _runtime_exec_options = str;
@@ -656,7 +654,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
         }
 
     default:
-        map<int, const char*>::const_iterator i =
+        std::map<int, const char*>::const_iterator i =
                 _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
         const char* str = "unknown node type";
 
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 3af9967fc0..82951632e4 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -44,7 +44,6 @@ class RowBatch;
 class RuntimeState;
 class TPlan;
 class TupleRow;
-class DataSink;
 class MemTracker;
 
 namespace vectorized {
@@ -61,9 +60,6 @@ class OperatorBase;
 using std::string;
 using std::stringstream;
 using std::vector;
-using std::map;
-using std::lock_guard;
-using std::mutex;
 
 // Superclass of all executor nodes.
 // All subclasses need to make sure to check RuntimeState::is_cancelled()
@@ -196,7 +192,7 @@ public:
     // This improve is cautious, we ensure the correctness firstly.
     void try_do_aggregate_serde_improve();
 
-    typedef bool (*EvalConjunctsFn)(ExprContext* const* ctxs, int num_ctxs, TupleRow* row);
+    using EvalConjunctsFn = bool (*)(ExprContext* const*, int, TupleRow*);
     // Evaluate exprs over row.  Returns true if all exprs return true.
     // TODO: This doesn't use the vector<Expr*> signature because I haven't figured
     // out how to deal with declaring a templated std:vector type in IR
diff --git a/be/src/pipeline/exec/broker_scan_operator.h b/be/src/pipeline/exec/broker_scan_operator.h
new file mode 100644
index 0000000000..584ad8c472
--- /dev/null
+++ b/be/src/pipeline/exec/broker_scan_operator.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris::pipeline {
+
+class BrokerScanOperatorBuilder : public OperatorBuilder<vectorized::VBrokerScanNode> {
+public:
+    BrokerScanOperatorBuilder(int32_t id, ExecNode* node)
+            : OperatorBuilder(id, "BrokerScanOperator", node) {}
+    bool is_source() const override { return true; }
+    OperatorPtr build_operator() override;
+};
+
+class BrokerScanOperator : public SourceOperator<BrokerScanOperatorBuilder> {
+public:
+    BrokerScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node)
+            : SourceOperator(operator_builder, scan_node) {}
+
+    bool can_read() override { return _node->can_read(); }
+
+    bool is_pending_finish() const override { return !_node->can_finish(); }
+
+    Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(SourceOperator::open(state));
+        return _node->open(state);
+    }
+
+    Status close(RuntimeState* state) override {
+        RETURN_IF_ERROR(SourceOperator::close(state));
+        _node->close(state);
+        return Status::OK();
+    }
+};
+
+OperatorPtr BrokerScanOperatorBuilder::build_operator() {
+    return std::make_shared<BrokerScanOperator>(this, _node);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 68870fc57b..46c79c8470 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -20,6 +20,7 @@
 #include <utility>
 
 #include "common/status.h"
+#include "exec/data_sink.h"
 #include "exec/exec_node.h"
 #include "runtime/runtime_state.h"
 #include "vec/core/block.h"
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 0addd4464b..595f16ba63 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -18,11 +18,11 @@
 #pragma once
 
 #include <atomic>
+#include <memory>
 #include <vector>
 
 #include "common/status.h"
 #include "exec/operator.h"
-#include "vec/core/block.h"
 
 namespace doris::pipeline {
 
@@ -31,16 +31,13 @@ using PipelinePtr = std::shared_ptr<Pipeline>;
 using Pipelines = std::vector<PipelinePtr>;
 using PipelineId = uint32_t;
 
-class PipelineTask;
-class PipelineFragmentContext;
-
 class Pipeline : public std::enable_shared_from_this<Pipeline> {
     friend class PipelineTask;
 
 public:
     Pipeline() = delete;
-    explicit Pipeline(PipelineId pipeline_id, std::shared_ptr<PipelineFragmentContext> context)
-            : _complete_dependency(0), _pipeline_id(pipeline_id), _context(std::move(context)) {}
+    explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
+            : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {}
 
     Status prepare(RuntimeState* state);
 
@@ -82,7 +79,7 @@ private:
     std::vector<std::shared_ptr<Pipeline>> _dependencies;
 
     PipelineId _pipeline_id;
-    std::shared_ptr<PipelineFragmentContext> _context;
+    std::weak_ptr<PipelineFragmentContext> _context;
 
     std::unique_ptr<RuntimeProfile> _pipeline_profile;
 };
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index a034b2a9a0..ecb79bb2d0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -17,6 +17,7 @@
 
 #include "pipeline_fragment_context.h"
 
+#include <gen_cpp/DataSinks_types.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include "exec/agg_context.h"
@@ -49,6 +50,7 @@
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/HeartbeatService_types.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/broker_scan_operator.h"
 #include "pipeline/exec/const_value_operator.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
@@ -81,22 +83,21 @@ using apache::thrift::TException;
 
 namespace doris::pipeline {
 
-PipelineFragmentContext::PipelineFragmentContext(const TUniqueId& query_id,
-                                                 const TUniqueId& instance_id, int backend_num,
-                                                 std::shared_ptr<QueryFragmentsCtx> query_ctx,
-                                                 ExecEnv* exec_env)
+PipelineFragmentContext::PipelineFragmentContext(
+        const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num,
+        std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env,
+        std::function<void(RuntimeState*, Status*)> call_back)
         : _query_id(query_id),
-          _fragment_instance_id(instance_id),
+          _fragment_id(instance_id),
           _backend_num(backend_num),
           _exec_env(exec_env),
           _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR),
           _closed_pipeline_cnt(0),
-          _query_ctx(std::move(query_ctx)) {
+          _query_ctx(std::move(query_ctx)),
+          _call_back(call_back) {
     _fragment_watcher.start();
 }
 
-PipelineFragmentContext::~PipelineFragmentContext() = default;
-
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
     if (!_runtime_state->is_cancelled()) {
@@ -114,7 +115,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
         _query_ctx->set_ready_to_execute(true);
 
         // must close stream_mgr to avoid dead lock in Exchange Node
-        _exec_env->vstream_mgr()->cancel(_fragment_instance_id);
+        _exec_env->vstream_mgr()->cancel(_fragment_id);
         // Cancel the result queue manager used by spark doris connector
         // TODO pipeline incomp
         // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg));
@@ -124,7 +125,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
 PipelinePtr PipelineFragmentContext::add_pipeline() {
     // _prepared、_submitted, _canceled should do not add pipeline
     PipelineId id = _next_pipeline_id++;
-    auto pipeline = std::make_shared<Pipeline>(id, shared_from_this());
+    auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
     _pipelines.emplace_back(pipeline);
     return pipeline;
 }
@@ -301,9 +302,17 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
     auto node_type = node->type();
     switch (node_type) {
     // for source
+    case TPlanNodeType::BROKER_SCAN_NODE: {
+        OperatorBuilderPtr operator_t = std::make_shared<BrokerScanOperatorBuilder>(
+                fragment_context->next_operator_builder_id(), node);
+        RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
+        break;
+    }
     case TPlanNodeType::OLAP_SCAN_NODE:
     case TPlanNodeType::JDBC_SCAN_NODE:
-    case TPlanNodeType::ODBC_SCAN_NODE: {
+    case TPlanNodeType::ODBC_SCAN_NODE:
+    case TPlanNodeType::FILE_SCAN_NODE:
+    case TPlanNodeType::ES_SCAN_NODE: {
         OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(
                 fragment_context->next_operator_builder_id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
@@ -600,7 +609,7 @@ void PipelineFragmentContext::send_report(bool done) {
     params.protocol_version = FrontendServiceVersion::V1;
     params.__set_query_id(_query_id);
     params.__set_backend_num(_backend_num);
-    params.__set_fragment_instance_id(_fragment_instance_id);
+    params.__set_fragment_instance_id(_fragment_id);
     exec_status.set_t_status(&params);
     params.__set_done(true);
 
@@ -685,8 +694,8 @@ void PipelineFragmentContext::send_report(bool done) {
             coord->reportExecStatus(res, params);
         } catch (TTransportException& e) {
             LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id)
-                         << ", instance id: " << print_id(_fragment_instance_id) << " to "
-                         << coord_addr << ", err: " << e.what();
+                         << ", instance id: " << print_id(_fragment_id) << " to " << coord_addr
+                         << ", err: " << e.what();
             rpc_status = coord.reopen();
 
             if (!rpc_status.ok()) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index fd1d50ebff..5302cc83b5 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -17,7 +17,8 @@
 
 #pragma once
 
-#include "pipeline.h"
+#include "pipeline/pipeline.h"
+#include "pipeline/pipeline_task.h"
 #include "runtime/runtime_state.h"
 
 namespace doris {
@@ -37,9 +38,10 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
 public:
     PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id,
                             int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx,
-                            ExecEnv* exec_env);
+                            ExecEnv* exec_env,
+                            std::function<void(RuntimeState*, Status*)> call_back);
 
-    virtual ~PipelineFragmentContext();
+    ~PipelineFragmentContext() { _call_back(_runtime_state.get(), &_exec_status); }
 
     PipelinePtr add_pipeline();
 
@@ -80,13 +82,11 @@ public:
 private:
     // Id of this query
     TUniqueId _query_id;
-    // Id of this instance
-    TUniqueId _fragment_instance_id;
+    TUniqueId _fragment_id;
 
     int _backend_num;
 
     ExecEnv* _exec_env;
-    TUniqueId _fragment_id;
 
     bool _prepared = false;
     bool _submitted = false;
@@ -129,6 +129,7 @@ private:
 
     template <bool is_intersect>
     Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
+    std::function<void(RuntimeState*, Status*)> _call_back;
 };
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 6e867d5959..250dfc2bab 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -75,9 +75,8 @@ bool PipelineTask::has_dependency() {
     if (_pipeline->has_dependency()) {
         return true;
     }
-    // FE do not call execute
-    if (!_state->get_query_fragments_ctx()
-                 ->is_ready_to_execute()) { // TODO pipeline config::s_ready_to_execute
+
+    if (!query_fragments_context()->is_ready_to_execute()) {
         return true;
     }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index a7f0dffec0..34cce073e6 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -167,7 +167,6 @@ private:
     void _init_profile();
     void _init_state();
 
-private:
     uint32_t _index;
     PipelinePtr _pipeline;
     bool _dependency_finish = false;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1ed44838a4..f1354140ac 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -74,7 +74,6 @@ std::string to_load_error_http_path(const std::string& file_name) {
 }
 
 using apache::thrift::TException;
-using apache::thrift::TProcessor;
 using apache::thrift::transport::TTransportException;
 
 class RuntimeProfile;
@@ -547,8 +546,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
         set_pipe(params.params.fragment_instance_id, pipe);
         return Status::OK();
     } else {
-        return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1,
-                                                          std::placeholders::_2));
+        return exec_plan_fragment(params, empty_function);
     }
 }
 
@@ -750,11 +748,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
         if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
             fragments_ctx->set_ready_to_execute_only();
         }
-
         std::shared_ptr<pipeline::PipelineFragmentContext> context =
                 std::make_shared<pipeline::PipelineFragmentContext>(
                         fragments_ctx->query_id, fragment_instance_id, params.backend_num,
-                        fragments_ctx, _exec_env);
+                        fragments_ctx, _exec_env, cb);
         {
             SCOPED_RAW_TIMER(&duration_ns);
             RETURN_IF_ERROR(context->prepare(params));
@@ -767,7 +764,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
             _cv.notify_all();
         }
         auto st = context->submit();
-        cb(context->get_runtime_state(), &st);
         if (!st.ok()) {
             context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail");
             remove_pipeline_context(context);
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 34336ed427..61244bbd9f 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -50,8 +50,6 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
             ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
                 ctx->commit_infos = std::move(state->tablet_commit_infos());
                 if (status->ok()) {
-                    LOG(WARNING) << "MYTEST " << int64_t(state) << " "
-                                 << state->num_rows_load_total();
                     ctx->number_total_rows = state->num_rows_load_total();
                     ctx->number_loaded_rows = state->num_rows_load_success();
                     ctx->number_filtered_rows = state->num_rows_load_filtered();
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 49319f1e6b..8da34c5b65 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -17,10 +17,7 @@
 
 #include "vec/exec/scan/new_file_scan_node.h"
 
-#include "vec/columns/column_const.h"
-#include "vec/exec/scan/new_olap_scanner.h"
 #include "vec/exec/scan/vfile_scanner.h"
-#include "vec/functions/in.h"
 
 namespace doris::vectorized {
 
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h
index 53b11e408d..12148da925 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -39,7 +39,6 @@ protected:
 private:
     VScanner* _create_scanner(const TFileScanRange& scan_range);
 
-private:
     std::vector<TScanRangeParams> _scan_ranges;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h
index 98b4afc677..452415014f 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -24,6 +24,7 @@
 #include "exec/scan_node.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "runtime/descriptors.h"
+
 namespace doris {
 
 class RuntimeState;
@@ -45,7 +46,7 @@ public:
     Status open(RuntimeState* state) override;
 
     // Fill the next row batch by calling next() on the scanner,
-    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
+    Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
         return Status::NotSupported("Not Implemented VBrokerScanNode::get_next.");
     }
 
@@ -57,6 +58,9 @@ public:
     // No use
     Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
 
+    bool can_read() { return true; }
+    bool can_finish() const { return _num_running_scanners == 0; }
+
 private:
     // Write debug string of this into out.
     void debug_string(int indentation_level, std::stringstream* out) const override;
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index eaddb6ff10..df9f7e79ce 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -19,14 +19,9 @@
 
 #include <fmt/format.h>
 
-#include <iostream>
-
-#include "exec/exec_node.h"
-#include "exec/plain_text_line_reader.h"
+#include "exec/line_reader.h"
 #include "exec/text_converter.h"
 #include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "util/utf8_check.h"
 
 namespace doris::vectorized {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org