You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2022/12/12 16:28:42 UTC
[doris] branch master updated: [Pipeline](load) support pipeline broker load (#14940)
This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c25a7235f9 [Pipeline](load) support pipeline broker load (#14940)
c25a7235f9 is described below
commit c25a7235f9b07acdd0637d5c7145ee5a5c81f614
Author: Pxl <px...@qq.com>
AuthorDate: Tue Dec 13 00:28:36 2022 +0800
[Pipeline](load) support pipeline broker load (#14940)
support pipeline broker load
---
be/src/exec/exec_node.cpp | 8 ++-
be/src/exec/exec_node.h | 6 +--
be/src/pipeline/exec/broker_scan_operator.h | 59 ++++++++++++++++++++++
be/src/pipeline/exec/operator.h | 1 +
be/src/pipeline/pipeline.h | 11 ++--
be/src/pipeline/pipeline_fragment_context.cpp | 37 +++++++++-----
be/src/pipeline/pipeline_fragment_context.h | 13 ++---
be/src/pipeline/pipeline_task.cpp | 5 +-
be/src/pipeline/pipeline_task.h | 1 -
be/src/runtime/fragment_mgr.cpp | 8 +--
.../runtime/stream_load/stream_load_executor.cpp | 2 -
be/src/vec/exec/scan/new_file_scan_node.cpp | 3 --
be/src/vec/exec/scan/new_file_scan_node.h | 1 -
be/src/vec/exec/vbroker_scan_node.h | 6 ++-
be/src/vec/exec/vbroker_scanner.cpp | 7 +--
15 files changed, 108 insertions(+), 60 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 5849b455cb..939954dd01 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -38,7 +38,6 @@
#include "exec/hash_join_node.h"
#include "exec/intersect_node.h"
#include "exec/merge_node.h"
-#include "exec/mysql_scan_node.h"
#include "exec/odbc_scan_node.h"
#include "exec/olap_scan_node.h"
#include "exec/partitioned_aggregation_node.h"
@@ -122,7 +121,7 @@ int ExecNode::RowBatchQueue::Cleanup() {
// delete batch;
// }
- lock_guard<std::mutex> l(lock_);
+ std::lock_guard<std::mutex> l(lock_);
for (std::list<RowBatch*>::iterator it = cleanup_queue_.begin(); it != cleanup_queue_.end();
++it) {
// num_io_buffers += (*it)->num_io_buffers();
@@ -144,7 +143,6 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_counter(nullptr),
_rows_returned_rate(nullptr),
_memory_used_counter(nullptr),
- _get_next_span(),
_is_closed(false),
_ref(0) {
if (tnode.__isset.output_tuple_id) {
@@ -321,7 +319,7 @@ Status ExecNode::close(RuntimeState* state) {
}
void ExecNode::add_runtime_exec_option(const std::string& str) {
- lock_guard<mutex> l(_exec_options_lock);
+ std::lock_guard<std::mutex> l(_exec_options_lock);
if (_runtime_exec_options.empty()) {
_runtime_exec_options = str;
@@ -656,7 +654,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
}
default:
- map<int, const char*>::const_iterator i =
+ std::map<int, const char*>::const_iterator i =
_TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
const char* str = "unknown node type";
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 3af9967fc0..82951632e4 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -44,7 +44,6 @@ class RowBatch;
class RuntimeState;
class TPlan;
class TupleRow;
-class DataSink;
class MemTracker;
namespace vectorized {
@@ -61,9 +60,6 @@ class OperatorBase;
using std::string;
using std::stringstream;
using std::vector;
-using std::map;
-using std::lock_guard;
-using std::mutex;
// Superclass of all executor nodes.
// All subclasses need to make sure to check RuntimeState::is_cancelled()
@@ -196,7 +192,7 @@ public:
// This improve is cautious, we ensure the correctness firstly.
void try_do_aggregate_serde_improve();
- typedef bool (*EvalConjunctsFn)(ExprContext* const* ctxs, int num_ctxs, TupleRow* row);
+ using EvalConjunctsFn = bool (*)(ExprContext* const*, int, TupleRow*);
// Evaluate exprs over row. Returns true if all exprs return true.
// TODO: This doesn't use the vector<Expr*> signature because I haven't figured
// out how to deal with declaring a templated std:vector type in IR
diff --git a/be/src/pipeline/exec/broker_scan_operator.h b/be/src/pipeline/exec/broker_scan_operator.h
new file mode 100644
index 0000000000..584ad8c472
--- /dev/null
+++ b/be/src/pipeline/exec/broker_scan_operator.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "vec/exec/vbroker_scan_node.h"
+
+namespace doris::pipeline {
+
+class BrokerScanOperatorBuilder : public OperatorBuilder<vectorized::VBrokerScanNode> {
+public:
+ BrokerScanOperatorBuilder(int32_t id, ExecNode* node)
+ : OperatorBuilder(id, "BrokerScanOperator", node) {}
+ bool is_source() const override { return true; }
+ OperatorPtr build_operator() override;
+};
+
+class BrokerScanOperator : public SourceOperator<BrokerScanOperatorBuilder> {
+public:
+ BrokerScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node)
+ : SourceOperator(operator_builder, scan_node) {}
+
+ bool can_read() override { return _node->can_read(); }
+
+ bool is_pending_finish() const override { return !_node->can_finish(); }
+
+ Status open(RuntimeState* state) override {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_ERROR(SourceOperator::open(state));
+ return _node->open(state);
+ }
+
+ Status close(RuntimeState* state) override {
+ RETURN_IF_ERROR(SourceOperator::close(state));
+ _node->close(state);
+ return Status::OK();
+ }
+};
+
+OperatorPtr BrokerScanOperatorBuilder::build_operator() {
+ return std::make_shared<BrokerScanOperator>(this, _node);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 68870fc57b..46c79c8470 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -20,6 +20,7 @@
#include <utility>
#include "common/status.h"
+#include "exec/data_sink.h"
#include "exec/exec_node.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 0addd4464b..595f16ba63 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -18,11 +18,11 @@
#pragma once
#include <atomic>
+#include <memory>
#include <vector>
#include "common/status.h"
#include "exec/operator.h"
-#include "vec/core/block.h"
namespace doris::pipeline {
@@ -31,16 +31,13 @@ using PipelinePtr = std::shared_ptr<Pipeline>;
using Pipelines = std::vector<PipelinePtr>;
using PipelineId = uint32_t;
-class PipelineTask;
-class PipelineFragmentContext;
-
class Pipeline : public std::enable_shared_from_this<Pipeline> {
friend class PipelineTask;
public:
Pipeline() = delete;
- explicit Pipeline(PipelineId pipeline_id, std::shared_ptr<PipelineFragmentContext> context)
- : _complete_dependency(0), _pipeline_id(pipeline_id), _context(std::move(context)) {}
+ explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
+ : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {}
Status prepare(RuntimeState* state);
@@ -82,7 +79,7 @@ private:
std::vector<std::shared_ptr<Pipeline>> _dependencies;
PipelineId _pipeline_id;
- std::shared_ptr<PipelineFragmentContext> _context;
+ std::weak_ptr<PipelineFragmentContext> _context;
std::unique_ptr<RuntimeProfile> _pipeline_profile;
};
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index a034b2a9a0..ecb79bb2d0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -17,6 +17,7 @@
#include "pipeline_fragment_context.h"
+#include <gen_cpp/DataSinks_types.h>
#include <thrift/protocol/TDebugProtocol.h>
#include "exec/agg_context.h"
@@ -49,6 +50,7 @@
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/broker_scan_operator.h"
#include "pipeline/exec/const_value_operator.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
@@ -81,22 +83,21 @@ using apache::thrift::TException;
namespace doris::pipeline {
-PipelineFragmentContext::PipelineFragmentContext(const TUniqueId& query_id,
- const TUniqueId& instance_id, int backend_num,
- std::shared_ptr<QueryFragmentsCtx> query_ctx,
- ExecEnv* exec_env)
+PipelineFragmentContext::PipelineFragmentContext(
+ const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num,
+ std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env,
+ std::function<void(RuntimeState*, Status*)> call_back)
: _query_id(query_id),
- _fragment_instance_id(instance_id),
+ _fragment_id(instance_id),
_backend_num(backend_num),
_exec_env(exec_env),
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR),
_closed_pipeline_cnt(0),
- _query_ctx(std::move(query_ctx)) {
+ _query_ctx(std::move(query_ctx)),
+ _call_back(call_back) {
_fragment_watcher.start();
}
-PipelineFragmentContext::~PipelineFragmentContext() = default;
-
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
if (!_runtime_state->is_cancelled()) {
@@ -114,7 +115,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
_query_ctx->set_ready_to_execute(true);
// must close stream_mgr to avoid dead lock in Exchange Node
- _exec_env->vstream_mgr()->cancel(_fragment_instance_id);
+ _exec_env->vstream_mgr()->cancel(_fragment_id);
// Cancel the result queue manager used by spark doris connector
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg));
@@ -124,7 +125,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
PipelinePtr PipelineFragmentContext::add_pipeline() {
// _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
- auto pipeline = std::make_shared<Pipeline>(id, shared_from_this());
+ auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
_pipelines.emplace_back(pipeline);
return pipeline;
}
@@ -301,9 +302,17 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto node_type = node->type();
switch (node_type) {
// for source
+ case TPlanNodeType::BROKER_SCAN_NODE: {
+ OperatorBuilderPtr operator_t = std::make_shared<BrokerScanOperatorBuilder>(
+ fragment_context->next_operator_builder_id(), node);
+ RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
+ break;
+ }
case TPlanNodeType::OLAP_SCAN_NODE:
case TPlanNodeType::JDBC_SCAN_NODE:
- case TPlanNodeType::ODBC_SCAN_NODE: {
+ case TPlanNodeType::ODBC_SCAN_NODE:
+ case TPlanNodeType::FILE_SCAN_NODE:
+ case TPlanNodeType::ES_SCAN_NODE: {
OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(
fragment_context->next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
@@ -600,7 +609,7 @@ void PipelineFragmentContext::send_report(bool done) {
params.protocol_version = FrontendServiceVersion::V1;
params.__set_query_id(_query_id);
params.__set_backend_num(_backend_num);
- params.__set_fragment_instance_id(_fragment_instance_id);
+ params.__set_fragment_instance_id(_fragment_id);
exec_status.set_t_status(¶ms);
params.__set_done(true);
@@ -685,8 +694,8 @@ void PipelineFragmentContext::send_report(bool done) {
coord->reportExecStatus(res, params);
} catch (TTransportException& e) {
LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id)
- << ", instance id: " << print_id(_fragment_instance_id) << " to "
- << coord_addr << ", err: " << e.what();
+ << ", instance id: " << print_id(_fragment_id) << " to " << coord_addr
+ << ", err: " << e.what();
rpc_status = coord.reopen();
if (!rpc_status.ok()) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index fd1d50ebff..5302cc83b5 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -17,7 +17,8 @@
#pragma once
-#include "pipeline.h"
+#include "pipeline/pipeline.h"
+#include "pipeline/pipeline_task.h"
#include "runtime/runtime_state.h"
namespace doris {
@@ -37,9 +38,10 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
public:
PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id,
int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx,
- ExecEnv* exec_env);
+ ExecEnv* exec_env,
+ std::function<void(RuntimeState*, Status*)> call_back);
- virtual ~PipelineFragmentContext();
+ ~PipelineFragmentContext() { _call_back(_runtime_state.get(), &_exec_status); }
PipelinePtr add_pipeline();
@@ -80,13 +82,11 @@ public:
private:
// Id of this query
TUniqueId _query_id;
- // Id of this instance
- TUniqueId _fragment_instance_id;
+ TUniqueId _fragment_id;
int _backend_num;
ExecEnv* _exec_env;
- TUniqueId _fragment_id;
bool _prepared = false;
bool _submitted = false;
@@ -129,6 +129,7 @@ private:
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
+ std::function<void(RuntimeState*, Status*)> _call_back;
};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 6e867d5959..250dfc2bab 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -75,9 +75,8 @@ bool PipelineTask::has_dependency() {
if (_pipeline->has_dependency()) {
return true;
}
- // FE do not call execute
- if (!_state->get_query_fragments_ctx()
- ->is_ready_to_execute()) { // TODO pipeline config::s_ready_to_execute
+
+ if (!query_fragments_context()->is_ready_to_execute()) {
return true;
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index a7f0dffec0..34cce073e6 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -167,7 +167,6 @@ private:
void _init_profile();
void _init_state();
-private:
uint32_t _index;
PipelinePtr _pipeline;
bool _dependency_finish = false;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1ed44838a4..f1354140ac 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -74,7 +74,6 @@ std::string to_load_error_http_path(const std::string& file_name) {
}
using apache::thrift::TException;
-using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;
class RuntimeProfile;
@@ -547,8 +546,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
set_pipe(params.params.fragment_instance_id, pipe);
return Status::OK();
} else {
- return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1,
- std::placeholders::_2));
+ return exec_plan_fragment(params, empty_function);
}
}
@@ -750,11 +748,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
fragments_ctx->set_ready_to_execute_only();
}
-
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
fragments_ctx->query_id, fragment_instance_id, params.backend_num,
- fragments_ctx, _exec_env);
+ fragments_ctx, _exec_env, cb);
{
SCOPED_RAW_TIMER(&duration_ns);
RETURN_IF_ERROR(context->prepare(params));
@@ -767,7 +764,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
_cv.notify_all();
}
auto st = context->submit();
- cb(context->get_runtime_state(), &st);
if (!st.ok()) {
context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail");
remove_pipeline_context(context);
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 34336ed427..61244bbd9f 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -50,8 +50,6 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
- LOG(WARNING) << "MYTEST " << int64_t(state) << " "
- << state->num_rows_load_total();
ctx->number_total_rows = state->num_rows_load_total();
ctx->number_loaded_rows = state->num_rows_load_success();
ctx->number_filtered_rows = state->num_rows_load_filtered();
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 49319f1e6b..8da34c5b65 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -17,10 +17,7 @@
#include "vec/exec/scan/new_file_scan_node.h"
-#include "vec/columns/column_const.h"
-#include "vec/exec/scan/new_olap_scanner.h"
#include "vec/exec/scan/vfile_scanner.h"
-#include "vec/functions/in.h"
namespace doris::vectorized {
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h
index 53b11e408d..12148da925 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -39,7 +39,6 @@ protected:
private:
VScanner* _create_scanner(const TFileScanRange& scan_range);
-private:
std::vector<TScanRangeParams> _scan_ranges;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h
index 98b4afc677..452415014f 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -24,6 +24,7 @@
#include "exec/scan_node.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "runtime/descriptors.h"
+
namespace doris {
class RuntimeState;
@@ -45,7 +46,7 @@ public:
Status open(RuntimeState* state) override;
// Fill the next row batch by calling next() on the scanner,
- virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
+ Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
return Status::NotSupported("Not Implemented VBrokerScanNode::get_next.");
}
@@ -57,6 +58,9 @@ public:
// No use
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
+ bool can_read() { return true; }
+ bool can_finish() const { return _num_running_scanners == 0; }
+
private:
// Write debug string of this into out.
void debug_string(int indentation_level, std::stringstream* out) const override;
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index eaddb6ff10..df9f7e79ce 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -19,14 +19,9 @@
#include <fmt/format.h>
-#include <iostream>
-
-#include "exec/exec_node.h"
-#include "exec/plain_text_line_reader.h"
+#include "exec/line_reader.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "util/utf8_check.h"
namespace doris::vectorized {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org