You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/04/23 14:54:04 UTC
[doris] branch master updated: [refactor](query context) rename query fragments context to query context and make query context safe (#18950)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3736530585 [refactor](query context) rename query fragments context to query context and make query context safe (#18950)
3736530585 is described below
commit 37365305859af37fe7f87fb3b16f2db6cbf8e4fd
Author: yiguolei <67...@qq.com>
AuthorDate: Sun Apr 23 22:53:56 2023 +0800
[refactor](query context) rename query fragments context to query context and make query context safe (#18950)
* [refactor](query context) rename query fragments context to query context and make query context safe
---------
Co-authored-by: yiguolei <yi...@gmail.com>
---
be/src/olap/reader.cpp | 4 +-
be/src/olap/rowset/segment_v2/segment.cpp | 4 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 9 +-
be/src/pipeline/pipeline_fragment_context.cpp | 4 +-
be/src/pipeline/pipeline_fragment_context.h | 8 +-
be/src/pipeline/pipeline_task.cpp | 6 +-
be/src/pipeline/pipeline_task.h | 4 +-
be/src/pipeline/task_scheduler.cpp | 7 +-
be/src/runtime/fragment_mgr.cpp | 168 ++++++++++-----------
be/src/runtime/fragment_mgr.h | 16 +-
be/src/runtime/plan_fragment_executor.cpp | 19 ++-
be/src/runtime/plan_fragment_executor.h | 7 +-
.../{query_fragments_ctx.h => query_context.h} | 19 ++-
be/src/runtime/runtime_state.h | 8 +-
be/src/vec/exec/join/vhash_join_node.cpp | 4 +-
be/src/vec/exec/scan/scanner_context.cpp | 4 +-
be/src/vec/exec/scan/vscan_node.cpp | 3 +-
be/src/vec/exec/scan/vscan_node.h | 4 +-
be/src/vec/exec/vsort_node.cpp | 6 +-
be/src/vec/olap/vcollect_iterator.cpp | 4 +-
be/test/runtime/fragment_mgr_test.cpp | 2 +-
21 files changed, 152 insertions(+), 158 deletions(-)
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 9886db76d6..53d06a2f32 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -48,7 +48,7 @@
#include "olap/schema.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
#include "vec/common/arena.h"
@@ -553,7 +553,7 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
if (read_params.use_topn_opt) {
auto& runtime_predicate =
- read_params.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
+ read_params.runtime_state->get_query_ctx()->get_runtime_predicate();
runtime_predicate.set_tablet_schema(_tablet_schema);
}
}
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index cf952c2856..b0589636af 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -44,7 +44,7 @@
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/memory/mem_tracker.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
#include "util/coding.h"
@@ -134,7 +134,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
}
if (read_options.use_topn_opt) {
- auto query_ctx = read_options.runtime_state->get_query_fragments_ctx();
+ auto query_ctx = read_options.runtime_state->get_query_ctx();
auto runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
if (runtime_predicate) {
int32_t uid =
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index a1c55855bf..76e443e78e 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -50,7 +50,7 @@
#include "olap/tablet_schema.h"
#include "olap/types.h"
#include "olap/utils.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
@@ -386,7 +386,7 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
- auto query_ctx = _opts.runtime_state->get_query_fragments_ctx();
+ auto query_ctx = _opts.runtime_state->get_query_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
}
@@ -446,7 +446,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
- auto query_ctx = _opts.runtime_state->get_query_fragments_ctx();
+ auto query_ctx = _opts.runtime_state->get_query_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
if (runtime_predicate) {
int32_t cid = _opts.tablet_schema->column(runtime_predicate->column_id()).unique_id();
@@ -1210,8 +1210,7 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
// all rows should be read, so runtime predicate will reduce rows for topn node
if (_opts.use_topn_opt &&
!(_opts.read_orderby_key_columns != nullptr && !_opts.read_orderby_key_columns->empty())) {
- auto& runtime_predicate =
- _opts.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
+ auto& runtime_predicate = _opts.runtime_state->get_query_ctx()->get_runtime_predicate();
_runtime_predicate = runtime_predicate.get_predictate();
if (_runtime_predicate) {
_col_predicates.push_back(_runtime_predicate.get());
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 9a5c23fb6c..5b4868e885 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -120,7 +120,7 @@ namespace doris::pipeline {
PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id,
- int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env,
+ int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb)
: _query_id(query_id),
@@ -210,7 +210,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state =
std::make_unique<RuntimeState>(local_params, request.query_id, request.query_options,
_query_ctx->query_globals, _exec_env);
- _runtime_state->set_query_fragments_ctx(_query_ctx.get());
+ _runtime_state->set_query_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index 5e23ab0e2b..87c075b85f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -34,7 +34,7 @@
#include "common/status.h"
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_task.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
@@ -65,7 +65,7 @@ public:
using report_status_callback = std::function<void(const ReportStatusRequest)>;
PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id,
const int fragment_id, int backend_num,
- std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env,
+ std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb);
@@ -95,7 +95,7 @@ public:
// TODO: Support pipeline runtime filter
- QueryFragmentsCtx* get_query_context() { return _query_ctx.get(); }
+ QueryContext* get_query_context() { return _query_ctx.get(); }
TUniqueId get_query_id() const { return _query_id; }
@@ -169,7 +169,7 @@ private:
ExecNode* _root_plan = nullptr; // lives in _runtime_state->obj_pool()
std::unique_ptr<DataSink> _sink;
- std::shared_ptr<QueryFragmentsCtx> _query_ctx;
+ std::shared_ptr<QueryContext> _query_ctx;
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 0ef3574a70..b5a0e5b602 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -28,7 +28,7 @@
#include "pipeline/pipeline.h"
#include "pipeline_fragment_context.h"
#include "runtime/descriptors.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/thread_context.h"
#include "task_queue.h"
#include "util/defer_op.h"
@@ -118,7 +118,7 @@ bool PipelineTask::has_dependency() {
return true;
}
- if (!query_fragments_context()->is_ready_to_execute()) {
+ if (!query_context()->is_ready_to_execute()) {
return true;
}
@@ -266,7 +266,7 @@ Status PipelineTask::close() {
return s;
}
-QueryFragmentsCtx* PipelineTask::query_fragments_context() {
+QueryContext* PipelineTask::query_context() {
return _fragment_context->get_query_context();
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 02eb8be992..d17d13dda7 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -31,7 +31,7 @@
#include "vec/core/block.h"
namespace doris {
-class QueryFragmentsCtx;
+class QueryContext;
class RuntimeState;
namespace pipeline {
class PipelineFragmentContext;
@@ -171,7 +171,7 @@ public:
PipelineFragmentContext* fragment_context() { return _fragment_context; }
- QueryFragmentsCtx* query_fragments_context();
+ QueryContext* query_context();
int get_previous_core_id() const {
return _previous_schedule_id != -1 ? _previous_schedule_id
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 7362dd38b3..cd219a25e0 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -35,7 +35,7 @@
#include "pipeline/pipeline_task.h"
#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "util/sse_util.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -124,9 +124,8 @@ void BlockedTaskScheduler::_schedule() {
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
}
- } else if (task->query_fragments_context()->is_timeout(now)) {
- LOG(WARNING) << "Timeout, query_id="
- << print_id(task->query_fragments_context()->query_id)
+ } else if (task->query_context()->is_timeout(now)) {
+ LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id)
<< ", instance_id="
<< print_id(task->fragment_context()->get_fragment_instance_id());
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a113fa3792..6c905afa9a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -62,7 +62,7 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/primitive_type.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
@@ -112,9 +112,9 @@ using apache::thrift::transport::TTransportException;
class FragmentExecState {
public:
using report_status_callback_impl = std::function<void(const ReportStatusRequest)>;
- // Constructor by using QueryFragmentsCtx
+ // Constructor by using QueryContext
FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num,
- ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> fragments_ctx,
+ ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
const report_status_callback_impl& report_status_cb_impl);
Status prepare(const TExecPlanFragmentParams& params);
@@ -167,7 +167,7 @@ public:
int get_timeout_second() const { return _timeout_second; }
- std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; }
+ std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; }
void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }
@@ -196,7 +196,7 @@ private:
std::atomic<bool> _cancelled {false};
// This context is shared by all fragments of this host in a query
- std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
+ std::shared_ptr<QueryContext> _query_ctx;
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
@@ -207,8 +207,7 @@ private:
FragmentExecState::FragmentExecState(const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int backend_num,
- ExecEnv* exec_env,
- std::shared_ptr<QueryFragmentsCtx> fragments_ctx,
+ ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
const report_status_callback_impl& report_status_cb_impl)
: _query_id(query_id),
_fragment_instance_id(fragment_instance_id),
@@ -218,10 +217,10 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id,
std::placeholders::_3)),
_set_rsc_info(false),
_timeout_second(-1),
- _fragments_ctx(std::move(fragments_ctx)),
+ _query_ctx(std::move(query_ctx)),
_report_status_cb_impl(report_status_cb_impl) {
_start_time = vectorized::VecDateTimeValue::local_time();
- _coord_addr = _fragments_ctx->coord_addr;
+ _coord_addr = _query_ctx->coord_addr;
}
Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
@@ -229,16 +228,16 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
_timeout_second = params.query_options.execution_timeout;
}
- if (_fragments_ctx == nullptr) {
+ if (_query_ctx == nullptr) {
if (params.__isset.resource_info) {
set_group(params.resource_info);
}
}
- if (_fragments_ctx == nullptr) {
+ if (_query_ctx == nullptr) {
return _executor.prepare(params);
} else {
- return _executor.prepare(params, _fragments_ctx.get());
+ return _executor.prepare(params, _query_ctx.get());
}
}
@@ -246,7 +245,7 @@ Status FragmentExecState::execute() {
if (_need_wait_execution_trigger) {
// if _need_wait_execution_trigger is true, which means this instance
// is prepared but need to wait for the signal to do the rest execution.
- if (!_fragments_ctx->wait_for_start()) {
+ if (!_query_ctx->wait_for_start()) {
return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout");
}
}
@@ -283,7 +282,7 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const
#ifndef BE_TEST
// Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
- auto stream_load_ctx = _fragments_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
+ auto stream_load_ctx = _query_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
}
@@ -347,7 +346,7 @@ FragmentMgr::~FragmentMgr() {
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
- _fragments_ctx_map.clear();
+ _query_ctx_map.clear();
}
}
@@ -529,19 +528,19 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed");
}
- std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
+ std::shared_ptr<QueryContext> query_ctx = exec_state->get_query_ctx();
bool all_done = false;
- if (fragments_ctx != nullptr) {
+ if (query_ctx != nullptr) {
// decrease the number of unfinished fragments
- all_done = fragments_ctx->countdown();
+ all_done = query_ctx->countdown();
}
// remove exec state after this fragment finished
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(exec_state->fragment_instance_id());
- if (all_done && fragments_ctx) {
- _fragments_ctx_map.erase(fragments_ctx->query_id);
+ if (all_done && query_ctx) {
+ _query_ctx_map.erase(query_ctx->query_id);
}
}
@@ -596,8 +595,8 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
- auto search = _fragments_ctx_map.find(query_id);
- if (search == _fragments_ctx_map.end()) {
+ auto search = _query_ctx_map.find(query_id);
+ if (search == _query_ctx_map.end()) {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
@@ -615,47 +614,46 @@ void FragmentMgr::remove_pipeline_context(
bool all_done = q_context->countdown();
_pipeline_map.erase(f_context->get_fragment_instance_id());
if (all_done) {
- _fragments_ctx_map.erase(query_id);
+ _query_ctx_map.erase(query_id);
}
}
template <typename Params>
Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
- std::shared_ptr<QueryFragmentsCtx>& fragments_ctx) {
+ std::shared_ptr<QueryContext>& query_ctx) {
if (params.is_simplified_param) {
- // Get common components from _fragments_ctx_map
+ // Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
- auto search = _fragments_ctx_map.find(query_id);
- if (search == _fragments_ctx_map.end()) {
+ auto search = _query_ctx_map.find(query_id);
+ if (search == _query_ctx_map.end()) {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
- fragments_ctx = search->second;
+ query_ctx = search->second;
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
- fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
- fragments_ctx->query_id = query_id;
- RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
- &(fragments_ctx->desc_tbl)));
- fragments_ctx->coord_addr = params.coord;
- LOG(INFO) << "query_id: "
- << UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo)
- << " coord_addr " << fragments_ctx->coord_addr
+ query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env);
+ query_ctx->query_id = query_id;
+ RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
+ &(query_ctx->desc_tbl)));
+ query_ctx->coord_addr = params.coord;
+ LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo)
+ << " coord_addr " << query_ctx->coord_addr
<< " total fragment num on current host: " << params.fragment_num_on_host;
- fragments_ctx->query_globals = params.query_globals;
+ query_ctx->query_globals = params.query_globals;
if (params.__isset.resource_info) {
- fragments_ctx->user = params.resource_info.user;
- fragments_ctx->group = params.resource_info.group;
- fragments_ctx->set_rsc_info = true;
+ query_ctx->user = params.resource_info.user;
+ query_ctx->group = params.resource_info.group;
+ query_ctx->set_rsc_info = true;
}
- fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
- fragments_ctx->timeout_second = params.query_options.execution_timeout;
- _set_scan_concurrency(params, fragments_ctx.get());
+ query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
+ query_ctx->timeout_second = params.query_options.execution_timeout;
+ _set_scan_concurrency(params, query_ctx.get());
bool has_query_mem_tracker =
params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0);
@@ -668,36 +666,36 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
bytes_limit = MemInfo::mem_limit();
}
if (params.query_options.query_type == TQueryType::SELECT) {
- fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+ query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY,
- fmt::format("Query#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
+ fmt::format("Query#Id={}", print_id(query_ctx->query_id)), bytes_limit);
} else if (params.query_options.query_type == TQueryType::LOAD) {
- fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+ query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
- fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
+ fmt::format("Load#Id={}", print_id(query_ctx->query_id)), bytes_limit);
} else { // EXTERNAL
- fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+ query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
- fmt::format("External#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
+ fmt::format("External#Id={}", print_id(query_ctx->query_id)), bytes_limit);
}
if (params.query_options.__isset.is_report_success &&
params.query_options.is_report_success) {
- fragments_ctx->query_mem_tracker->enable_print_log_usage();
+ query_ctx->query_mem_tracker->enable_print_log_usage();
}
{
- // Find _fragments_ctx_map again, in case some other request has already
+ // Find _query_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
- auto search = _fragments_ctx_map.find(query_id);
- if (search == _fragments_ctx_map.end()) {
- _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
+ auto search = _query_ctx_map.find(query_id);
+ if (search == _query_ctx_map.end()) {
+ _query_ctx_map.insert(std::make_pair(query_ctx->query_id, query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
- << print_id(fragments_ctx->query_id)
+ << print_id(query_ctx->query_id)
<< " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
} else {
// Already has a query fragments context, use it
- fragments_ctx = search->second;
+ query_ctx = search->second;
}
}
}
@@ -726,16 +724,16 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
}
std::shared_ptr<FragmentExecState> exec_state;
- std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
+ std::shared_ptr<QueryContext> query_ctx;
bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine;
RETURN_IF_ERROR(
- _get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, fragments_ctx));
- fragments_ctx->fragment_ids.push_back(fragment_instance_id);
+ _get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, query_ctx));
+ query_ctx->fragment_ids.push_back(fragment_instance_id);
exec_state.reset(
- new FragmentExecState(fragments_ctx->query_id, params.params.fragment_instance_id,
- params.backend_num, _exec_env, fragments_ctx,
+ new FragmentExecState(query_ctx->query_id, params.params.fragment_instance_id,
+ params.backend_num, _exec_env, query_ctx,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback),
this, std::placeholders::_1)));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
@@ -794,8 +792,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
std::shared_ptr<FragmentExecState> exec_state;
- std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
- RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, fragments_ctx));
+ std::shared_ptr<QueryContext> query_ctx;
+ RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
if (params.__isset.resource_groups && !params.resource_groups.empty()) {
taskgroup::TaskGroupInfo task_group_info;
@@ -805,12 +803,12 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
auto tg = taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
task_group_info);
_exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info, tg);
- fragments_ctx->set_task_group(tg);
- LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id)
+ query_ctx->set_task_group(tg);
+ LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id)
<< " use task group: " << tg->debug_string();
}
} else {
- VLOG_DEBUG << "Query/load id: " << print_id(fragments_ctx->query_id)
+ VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id)
<< " does not use task group.";
}
@@ -827,11 +825,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}
}
- fragments_ctx->fragment_ids.push_back(fragment_instance_id);
+ query_ctx->fragment_ids.push_back(fragment_instance_id);
exec_state.reset(new FragmentExecState(
- fragments_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env,
- fragments_ctx,
+ query_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env,
+ query_ctx,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1)));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
@@ -842,14 +840,14 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
int64_t duration_ns = 0;
if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
- fragments_ctx->set_ready_to_execute_only();
+ query_ctx->set_ready_to_execute_only();
}
_setup_shared_hashtable_for_broadcast_join(
- params, local_params, exec_state->executor()->runtime_state(), fragments_ctx.get());
+ params, local_params, exec_state->executor()->runtime_state(), query_ctx.get());
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
- fragments_ctx->query_id, fragment_instance_id, params.fragment_id,
- local_params.backend_num, fragments_ctx, _exec_env, cb,
+ query_ctx->query_id, fragment_instance_id, params.fragment_id,
+ local_params.backend_num, query_ctx, _exec_env, cb,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1));
{
@@ -879,13 +877,13 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}
template <typename Param>
-void FragmentMgr::_set_scan_concurrency(const Param& params, QueryFragmentsCtx* fragments_ctx) {
+void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query_ctx) {
#ifndef BE_TEST
// If the token is set, the scan task will use limited_scan_pool in scanner scheduler.
// Otherwise, the scan task will use local/remote scan pool in scanner scheduler
if (params.query_options.__isset.resource_limit &&
params.query_options.resource_limit.__isset.cpu_limit) {
- fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, false);
+ query_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, false);
}
#endif
}
@@ -923,8 +921,8 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan
std::vector<TUniqueId> cancel_fragment_ids;
{
std::lock_guard<std::mutex> lock(_lock);
- auto ctx = _fragments_ctx_map.find(query_id);
- if (ctx != _fragments_ctx_map.end()) {
+ auto ctx = _query_ctx_map.find(query_id);
+ if (ctx != _query_ctx_map.end()) {
cancel_fragment_ids = ctx->second->fragment_ids;
}
}
@@ -935,8 +933,8 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan
bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
std::lock_guard<std::mutex> lock(_lock);
- auto ctx = _fragments_ctx_map.find(query_id);
- if (ctx != _fragments_ctx_map.end()) {
+ auto ctx = _query_ctx_map.find(query_id);
+ if (ctx != _query_ctx_map.end()) {
for (auto it : ctx->second->fragment_ids) {
auto exec_state_iter = _fragment_map.find(it);
if (exec_state_iter != _fragment_map.end() && exec_state_iter->second) {
@@ -965,9 +963,9 @@ void FragmentMgr::cancel_worker() {
to_cancel.push_back(it.second->fragment_instance_id());
}
}
- for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) {
+ for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
if (it->second->is_timeout(now)) {
- it = _fragments_ctx_map.erase(it);
+ it = _query_ctx_map.erase(it);
} else {
++it;
}
@@ -1192,7 +1190,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
RuntimeState* state,
- QueryFragmentsCtx* fragments_ctx) {
+ QueryContext* query_ctx) {
if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
!params.query_options.enable_share_hash_table_for_broadcast_join) {
return;
@@ -1210,7 +1208,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFrag
}
if (params.build_hash_table_for_broadcast_join) {
- fragments_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
+ query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
params.params.fragment_instance_id, params.instances_sharing_hash_table,
node.node_id);
}
@@ -1219,7 +1217,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFrag
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params,
- RuntimeState* state, QueryFragmentsCtx* fragments_ctx) {
+ RuntimeState* state, QueryContext* query_ctx) {
if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
!params.query_options.enable_share_hash_table_for_broadcast_join) {
return;
@@ -1237,7 +1235,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
}
if (local_params.build_hash_table_for_broadcast_join) {
- fragments_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
+ query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
local_params.fragment_instance_id, params.instances_sharing_hash_table,
node.node_id);
}
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index d5a724787e..2cc7fcc251 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -47,7 +47,7 @@ namespace doris {
namespace pipeline {
class PipelineFragmentContext;
}
-class QueryFragmentsCtx;
+class QueryContext;
class ExecEnv;
class FragmentExecState;
class ThreadPool;
@@ -139,20 +139,18 @@ private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const FinishCallback& cb);
template <typename Param>
- void _set_scan_concurrency(const Param& params, QueryFragmentsCtx* fragments_ctx);
+ void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
- RuntimeState* state,
- QueryFragmentsCtx* fragments_ctx);
+ RuntimeState* state, QueryContext* query_ctx);
void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params,
const TPipelineInstanceParams& local_params,
- RuntimeState* state,
- QueryFragmentsCtx* fragments_ctx);
+ RuntimeState* state, QueryContext* query_ctx);
template <typename Params>
Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
- std::shared_ptr<QueryFragmentsCtx>& fragments_ctx);
+ std::shared_ptr<QueryContext>& query_ctx);
// This is input params
ExecEnv* _exec_env;
@@ -166,8 +164,8 @@ private:
std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
- // query id -> QueryFragmentsCtx
- std::unordered_map<TUniqueId, std::shared_ptr<QueryFragmentsCtx>> _fragments_ctx_map;
+ // query id -> QueryContext
+ std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 10747ea7d3..ba1db4b69d 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -45,7 +45,7 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/runtime_filter_mgr.h"
@@ -94,7 +94,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
}
Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
- QueryFragmentsCtx* fragments_ctx) {
+ QueryContext* query_ctx) {
OpentelemetryTracer tracer = telemetry::get_noop_tracer();
if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
tracer = telemetry::get_tracer(print_id(_query_id));
@@ -112,12 +112,11 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
// VLOG_CRITICAL << "request:\n" << apache::thrift::ThriftDebugString(request);
const TQueryGlobals& query_globals =
- fragments_ctx == nullptr ? request.query_globals : fragments_ctx->query_globals;
+ query_ctx == nullptr ? request.query_globals : query_ctx->query_globals;
_runtime_state.reset(new RuntimeState(params, request.query_options, query_globals, _exec_env));
- _runtime_state->set_query_fragments_ctx(fragments_ctx);
- _runtime_state->set_query_mem_tracker(fragments_ctx == nullptr
- ? _exec_env->orphan_mem_tracker()
- : fragments_ctx->query_mem_tracker);
+ _runtime_state->set_query_ctx(query_ctx);
+ _runtime_state->set_query_mem_tracker(query_ctx == nullptr ? _exec_env->orphan_mem_tracker()
+ : query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
SCOPED_ATTACH_TASK(_runtime_state.get());
@@ -142,8 +141,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
// set up desc tbl
DescriptorTbl* desc_tbl = nullptr;
- if (fragments_ctx != nullptr) {
- desc_tbl = fragments_ctx->desc_tbl;
+ if (query_ctx != nullptr) {
+ desc_tbl = query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl, &desc_tbl));
@@ -487,7 +486,7 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const
_cancel_msg = msg;
_runtime_state->set_is_cancelled(true);
// To notify wait_for_start()
- _runtime_state->get_query_fragments_ctx()->set_ready_to_execute(true);
+ _runtime_state->get_query_ctx()->set_ready_to_execute(true);
// must close stream_mgr to avoid dead lock in Exchange Node
auto env = _runtime_state->exec_env();
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index 61f03b624d..bc58a861d1 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -39,7 +39,7 @@
namespace doris {
-class QueryFragmentsCtx;
+class QueryContext;
class ExecNode;
class RowDescriptor;
class DataSink;
@@ -98,9 +98,8 @@ public:
// If request.query_options.mem_limit > 0, it is used as an approximate limit on the
// number of bytes this query can consume at runtime.
// The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
- // If fragments_ctx is not null, some components will be got from fragments_ctx.
- Status prepare(const TExecPlanFragmentParams& request,
- QueryFragmentsCtx* fragments_ctx = nullptr);
+ // If query_ctx is not null, some components will be got from query_ctx.
+ Status prepare(const TExecPlanFragmentParams& request, QueryContext* query_ctx = nullptr);
// Start execution. Call this prior to get_next().
// If this fragment has a sink, open() will send all rows produced
diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_context.h
similarity index 92%
rename from be/src/runtime/query_fragments_ctx.h
rename to be/src/runtime/query_context.h
index a5ff251dce..b941f0fe86 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_context.h
@@ -25,6 +25,7 @@
#include <string>
#include "common/config.h"
+#include "common/factory_creator.h"
#include "common/object_pool.h"
#include "runtime/datetime_value.h"
#include "runtime/exec_env.h"
@@ -43,18 +44,20 @@ namespace doris {
// Some components like DescriptorTbl may be very large
// that will slow down each execution of fragments when DeSer them every time.
class DescriptorTbl;
-class QueryFragmentsCtx {
+class QueryContext {
+ ENABLE_FACTORY_CREATOR(QueryContext);
+
public:
- QueryFragmentsCtx(int total_fragment_num, ExecEnv* exec_env)
+ QueryContext(int total_fragment_num, ExecEnv* exec_env)
: fragment_num(total_fragment_num), timeout_second(-1), _exec_env(exec_env) {
_start_time = vectorized::VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
}
- ~QueryFragmentsCtx() {
- // query mem tracker consumption is equal to 0, it means that after QueryFragmentsCtx is created,
- // it is found that query already exists in _fragments_ctx_map, and query mem tracker is not used.
+ ~QueryContext() {
+ // query mem tracker consumption is equal to 0, it means that after QueryContext is created,
+ // it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
// query mem tracker consumption is not equal to 0 after use, because there is memory consumed
// on query mem tracker, released on other trackers.
if (query_mem_tracker->consumption() != 0) {
@@ -146,12 +149,12 @@ public:
TQueryGlobals query_globals;
/// In the current implementation, for multiple fragments executed by a query on the same BE node,
- /// we store some common components in QueryFragmentsCtx, and save QueryFragmentsCtx in FragmentMgr.
- /// When all Fragments are executed, QueryFragmentsCtx needs to be deleted from FragmentMgr.
+ /// we store some common components in QueryContext, and save QueryContext in FragmentMgr.
+ /// When all Fragments are executed, QueryContext needs to be deleted from FragmentMgr.
/// Here we use a counter to store the number of Fragments that have not yet been completed,
/// and after each Fragment is completed, this value will be reduced by one.
/// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment
- /// will clean up QueryFragmentsCtx.
+ /// will clean up QueryContext.
std::atomic<int> fragment_num;
int timeout_second;
ObjectPool obj_pool;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index bb29113529..82f7263ae2 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -48,7 +48,7 @@ class ObjectPool;
class ExecEnv;
class RuntimeFilterMgr;
class MemTrackerLimiter;
-class QueryFragmentsCtx;
+class QueryContext;
// A collection of items that are part of the global state of a
// query and shared across all execution nodes of that query.
@@ -340,9 +340,9 @@ public:
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
- void set_query_fragments_ctx(QueryFragmentsCtx* ctx) { _query_ctx = ctx; }
+ void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; }
- QueryFragmentsCtx* get_query_fragments_ctx() { return _query_ctx; }
+ QueryContext* get_query_ctx() { return _query_ctx; }
void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) {
_query_mem_tracker = tracker;
@@ -476,7 +476,7 @@ private:
std::vector<TTabletCommitInfo> _tablet_commit_infos;
std::vector<TErrorTabletInfo> _error_tablet_infos;
- QueryFragmentsCtx* _query_ctx;
+ QueryContext* _query_ctx;
// true if max_filter_ratio is 0
bool _load_zero_tolerance = false;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index f6bfe66525..f1cba5c0c7 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -44,7 +44,7 @@
#include "gutil/strings/substitute.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
@@ -452,7 +452,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
if (state->enable_share_hash_table_for_broadcast_join()) {
runtime_profile()->add_info_string("ShareHashTableEnabled", "true");
_shared_hashtable_controller =
- state->get_query_fragments_ctx()->get_shared_hash_table_controller();
+ state->get_query_ctx()->get_shared_hash_table_controller();
_shared_hash_table_context = _shared_hashtable_controller->get_context(id());
_should_build_hash_table = _shared_hashtable_controller->should_build_hash_table(
state->fragment_instance_id(), id());
diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp
index aabd95f1e6..9ed5084b41 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -30,7 +30,7 @@
#include "common/config.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "util/pretty_printer.h"
#include "util/uid_util.h"
@@ -112,7 +112,7 @@ Status ScannerContext::init() {
#ifndef BE_TEST
// 3. get thread token
- thread_token = _state->get_query_fragments_ctx()->get_token();
+ thread_token = _state->get_query_ctx()->get_token();
#endif
// 4. This ctx will be submitted to the scanner scheduler right after init.
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 590f584e92..fb2ade6485 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -125,8 +125,7 @@ Status VScanNode::prepare(RuntimeState* state) {
if (_is_pipeline_scan) {
if (_shared_scan_opt) {
- _shared_scanner_controller =
- state->get_query_fragments_ctx()->get_shared_scanner_controller();
+ _shared_scanner_controller = state->get_query_ctx()->get_shared_scanner_controller();
auto [should_create_scanner, queue_id] =
_shared_scanner_controller->should_build_scanner_and_queue_id(id());
_should_create_scanner = should_create_scanner;
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 294ccf6437..231ce9fb19 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -39,7 +39,7 @@
#include "exec/olap_common.h"
#include "exprs/function_filter.h"
#include "runtime/define_primitive_type.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "util/lock.h"
#include "util/runtime_profile.h"
@@ -119,7 +119,7 @@ public:
if (_is_pipeline_scan) {
if (_shared_scan_opt) {
_shared_scanner_controller =
- state->get_query_fragments_ctx()->get_shared_scanner_controller();
+ state->get_query_ctx()->get_shared_scanner_controller();
auto [should_create_scanner, queue_id] =
_shared_scanner_controller->should_build_scanner_and_queue_id(id());
_should_create_scanner = should_create_scanner;
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4901414d95..6fb7f36b2a 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -31,7 +31,7 @@
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/memory/mem_tracker.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
@@ -82,7 +82,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
// init runtime predicate
_use_topn_opt = tnode.sort_node.use_topn_opt;
if (_use_topn_opt) {
- auto query_ctx = state->get_query_fragments_ctx();
+ auto query_ctx = state->get_query_ctx();
auto first_sort_expr_node = tnode.sort_node.sort_info.ordering_exprs[0].nodes[0];
if (first_sort_expr_node.node_type == TExprNodeType::SLOT_REF) {
auto first_sort_slot = first_sort_expr_node.slot_ref;
@@ -143,7 +143,7 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool
auto& sort_description = _sorter->get_sort_description();
auto col = input_block->get_by_position(sort_description[0].column_number);
bool is_reverse = sort_description[0].direction < 0;
- auto query_ctx = state->get_query_fragments_ctx();
+ auto query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(
query_ctx->get_runtime_predicate().update(new_top, col.name, is_reverse));
old_top = std::move(new_top);
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index 82bd608810..1045621c9c 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -36,7 +36,7 @@
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
#include "vec/columns/column.h"
@@ -391,7 +391,7 @@ Status VCollectIterator::_topn_next(Block* block) {
col_ptr->get(last_sorted_row, new_top);
// update orderby_extrems in query global context
- auto query_ctx = _reader->_reader_context.runtime_state->get_query_fragments_ctx();
+ auto query_ctx = _reader->_reader_context.runtime_state->get_query_ctx();
RETURN_IF_ERROR(
query_ctx->get_runtime_predicate().update(new_top, col_name, _is_reverse));
}
diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp
index 323628c44d..1d6e09eccc 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -36,7 +36,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
PlanFragmentExecutor::~PlanFragmentExecutor() {}
Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
- QueryFragmentsCtx* batch_ctx) {
+ QueryContext* batch_ctx) {
return s_prepare_status;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org