You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/04/23 14:54:04 UTC

[doris] branch master updated: [refactor](query context) rename query fragments context to query context and make query context safe (#18950)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3736530585 [refactor](query context) rename query fragments context to query context and make query context safe (#18950)
3736530585 is described below

commit 37365305859af37fe7f87fb3b16f2db6cbf8e4fd
Author: yiguolei <67...@qq.com>
AuthorDate: Sun Apr 23 22:53:56 2023 +0800

    [refactor](query context) rename query fragments context to query context and make query context safe (#18950)
    
    * [refactor](query context) rename query fragments context to query context and make query context safe
    
    ---------
    
    Co-authored-by: yiguolei <yi...@gmail.com>
---
 be/src/olap/reader.cpp                             |   4 +-
 be/src/olap/rowset/segment_v2/segment.cpp          |   4 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |   9 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   4 +-
 be/src/pipeline/pipeline_fragment_context.h        |   8 +-
 be/src/pipeline/pipeline_task.cpp                  |   6 +-
 be/src/pipeline/pipeline_task.h                    |   4 +-
 be/src/pipeline/task_scheduler.cpp                 |   7 +-
 be/src/runtime/fragment_mgr.cpp                    | 168 ++++++++++-----------
 be/src/runtime/fragment_mgr.h                      |  16 +-
 be/src/runtime/plan_fragment_executor.cpp          |  19 ++-
 be/src/runtime/plan_fragment_executor.h            |   7 +-
 .../{query_fragments_ctx.h => query_context.h}     |  19 ++-
 be/src/runtime/runtime_state.h                     |   8 +-
 be/src/vec/exec/join/vhash_join_node.cpp           |   4 +-
 be/src/vec/exec/scan/scanner_context.cpp           |   4 +-
 be/src/vec/exec/scan/vscan_node.cpp                |   3 +-
 be/src/vec/exec/scan/vscan_node.h                  |   4 +-
 be/src/vec/exec/vsort_node.cpp                     |   6 +-
 be/src/vec/olap/vcollect_iterator.cpp              |   4 +-
 be/test/runtime/fragment_mgr_test.cpp              |   2 +-
 21 files changed, 152 insertions(+), 158 deletions(-)

diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 9886db76d6..53d06a2f32 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -48,7 +48,7 @@
 #include "olap/schema.h"
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
 #include "vec/common/arena.h"
@@ -553,7 +553,7 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
 
     if (read_params.use_topn_opt) {
         auto& runtime_predicate =
-                read_params.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
+                read_params.runtime_state->get_query_ctx()->get_runtime_predicate();
         runtime_predicate.set_tablet_schema(_tablet_schema);
     }
 }
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index cf952c2856..b0589636af 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -44,7 +44,7 @@
 #include "olap/types.h"
 #include "olap/utils.h"
 #include "runtime/memory/mem_tracker.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
 #include "util/coding.h"
@@ -134,7 +134,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
     }
 
     if (read_options.use_topn_opt) {
-        auto query_ctx = read_options.runtime_state->get_query_fragments_ctx();
+        auto query_ctx = read_options.runtime_state->get_query_ctx();
         auto runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
         if (runtime_predicate) {
             int32_t uid =
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index a1c55855bf..76e443e78e 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -50,7 +50,7 @@
 #include "olap/tablet_schema.h"
 #include "olap/types.h"
 #include "olap/utils.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
@@ -386,7 +386,7 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
 
     std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
     if (_opts.use_topn_opt) {
-        auto query_ctx = _opts.runtime_state->get_query_fragments_ctx();
+        auto query_ctx = _opts.runtime_state->get_query_ctx();
         runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
     }
 
@@ -446,7 +446,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
 
     std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
     if (_opts.use_topn_opt) {
-        auto query_ctx = _opts.runtime_state->get_query_fragments_ctx();
+        auto query_ctx = _opts.runtime_state->get_query_ctx();
         runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
         if (runtime_predicate) {
             int32_t cid = _opts.tablet_schema->column(runtime_predicate->column_id()).unique_id();
@@ -1210,8 +1210,7 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
     //  all rows should be read, so runtime predicate will reduce rows for topn node
     if (_opts.use_topn_opt &&
         !(_opts.read_orderby_key_columns != nullptr && !_opts.read_orderby_key_columns->empty())) {
-        auto& runtime_predicate =
-                _opts.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
+        auto& runtime_predicate = _opts.runtime_state->get_query_ctx()->get_runtime_predicate();
         _runtime_predicate = runtime_predicate.get_predictate();
         if (_runtime_predicate) {
             _col_predicates.push_back(_runtime_predicate.get());
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 9a5c23fb6c..5b4868e885 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -120,7 +120,7 @@ namespace doris::pipeline {
 
 PipelineFragmentContext::PipelineFragmentContext(
         const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id,
-        int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env,
+        int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
         const std::function<void(RuntimeState*, Status*)>& call_back,
         const report_status_callback& report_status_cb)
         : _query_id(query_id),
@@ -210,7 +210,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
     _runtime_state =
             std::make_unique<RuntimeState>(local_params, request.query_id, request.query_options,
                                            _query_ctx->query_globals, _exec_env);
-    _runtime_state->set_query_fragments_ctx(_query_ctx.get());
+    _runtime_state->set_query_ctx(_query_ctx.get());
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
     _runtime_state->set_tracer(std::move(tracer));
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index 5e23ab0e2b..87c075b85f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -34,7 +34,7 @@
 #include "common/status.h"
 #include "pipeline/pipeline.h"
 #include "pipeline/pipeline_task.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
@@ -65,7 +65,7 @@ public:
     using report_status_callback = std::function<void(const ReportStatusRequest)>;
     PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id,
                             const int fragment_id, int backend_num,
-                            std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env,
+                            std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
                             const std::function<void(RuntimeState*, Status*)>& call_back,
                             const report_status_callback& report_status_cb);
 
@@ -95,7 +95,7 @@ public:
 
     // TODO: Support pipeline runtime filter
 
-    QueryFragmentsCtx* get_query_context() { return _query_ctx.get(); }
+    QueryContext* get_query_context() { return _query_ctx.get(); }
 
     TUniqueId get_query_id() const { return _query_id; }
 
@@ -169,7 +169,7 @@ private:
     ExecNode* _root_plan = nullptr; // lives in _runtime_state->obj_pool()
     std::unique_ptr<DataSink> _sink;
 
-    std::shared_ptr<QueryFragmentsCtx> _query_ctx;
+    std::shared_ptr<QueryContext> _query_ctx;
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
 
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 0ef3574a70..b5a0e5b602 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -28,7 +28,7 @@
 #include "pipeline/pipeline.h"
 #include "pipeline_fragment_context.h"
 #include "runtime/descriptors.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/thread_context.h"
 #include "task_queue.h"
 #include "util/defer_op.h"
@@ -118,7 +118,7 @@ bool PipelineTask::has_dependency() {
         return true;
     }
 
-    if (!query_fragments_context()->is_ready_to_execute()) {
+    if (!query_context()->is_ready_to_execute()) {
         return true;
     }
 
@@ -266,7 +266,7 @@ Status PipelineTask::close() {
     return s;
 }
 
-QueryFragmentsCtx* PipelineTask::query_fragments_context() {
+QueryContext* PipelineTask::query_context() {
     return _fragment_context->get_query_context();
 }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 02eb8be992..d17d13dda7 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -31,7 +31,7 @@
 #include "vec/core/block.h"
 
 namespace doris {
-class QueryFragmentsCtx;
+class QueryContext;
 class RuntimeState;
 namespace pipeline {
 class PipelineFragmentContext;
@@ -171,7 +171,7 @@ public:
 
     PipelineFragmentContext* fragment_context() { return _fragment_context; }
 
-    QueryFragmentsCtx* query_fragments_context();
+    QueryContext* query_context();
 
     int get_previous_core_id() const {
         return _previous_schedule_id != -1 ? _previous_schedule_id
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 7362dd38b3..cd219a25e0 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -35,7 +35,7 @@
 #include "pipeline/pipeline_task.h"
 #include "pipeline/task_queue.h"
 #include "pipeline_fragment_context.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "util/sse_util.hpp"
 #include "util/thread.h"
 #include "util/threadpool.h"
@@ -124,9 +124,8 @@ void BlockedTaskScheduler::_schedule() {
                 } else {
                     _make_task_run(local_blocked_tasks, iter, ready_tasks);
                 }
-            } else if (task->query_fragments_context()->is_timeout(now)) {
-                LOG(WARNING) << "Timeout, query_id="
-                             << print_id(task->query_fragments_context()->query_id)
+            } else if (task->query_context()->is_timeout(now)) {
+                LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id)
                              << ", instance_id="
                              << print_id(task->fragment_context()->get_fragment_instance_id());
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a113fa3792..6c905afa9a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -62,7 +62,7 @@
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/plan_fragment_executor.h"
 #include "runtime/primitive_type.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
@@ -112,9 +112,9 @@ using apache::thrift::transport::TTransportException;
 class FragmentExecState {
 public:
     using report_status_callback_impl = std::function<void(const ReportStatusRequest)>;
-    // Constructor by using QueryFragmentsCtx
+    // Constructor by using QueryContext
     FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num,
-                      ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> fragments_ctx,
+                      ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
                       const report_status_callback_impl& report_status_cb_impl);
 
     Status prepare(const TExecPlanFragmentParams& params);
@@ -167,7 +167,7 @@ public:
 
     int get_timeout_second() const { return _timeout_second; }
 
-    std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; }
+    std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; }
 
     void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }
 
@@ -196,7 +196,7 @@ private:
     std::atomic<bool> _cancelled {false};
 
     // This context is shared by all fragments of this host in a query
-    std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
+    std::shared_ptr<QueryContext> _query_ctx;
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
 
@@ -207,8 +207,7 @@ private:
 
 FragmentExecState::FragmentExecState(const TUniqueId& query_id,
                                      const TUniqueId& fragment_instance_id, int backend_num,
-                                     ExecEnv* exec_env,
-                                     std::shared_ptr<QueryFragmentsCtx> fragments_ctx,
+                                     ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
                                      const report_status_callback_impl& report_status_cb_impl)
         : _query_id(query_id),
           _fragment_instance_id(fragment_instance_id),
@@ -218,10 +217,10 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id,
                                               std::placeholders::_3)),
           _set_rsc_info(false),
           _timeout_second(-1),
-          _fragments_ctx(std::move(fragments_ctx)),
+          _query_ctx(std::move(query_ctx)),
           _report_status_cb_impl(report_status_cb_impl) {
     _start_time = vectorized::VecDateTimeValue::local_time();
-    _coord_addr = _fragments_ctx->coord_addr;
+    _coord_addr = _query_ctx->coord_addr;
 }
 
 Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
@@ -229,16 +228,16 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
         _timeout_second = params.query_options.execution_timeout;
     }
 
-    if (_fragments_ctx == nullptr) {
+    if (_query_ctx == nullptr) {
         if (params.__isset.resource_info) {
             set_group(params.resource_info);
         }
     }
 
-    if (_fragments_ctx == nullptr) {
+    if (_query_ctx == nullptr) {
         return _executor.prepare(params);
     } else {
-        return _executor.prepare(params, _fragments_ctx.get());
+        return _executor.prepare(params, _query_ctx.get());
     }
 }
 
@@ -246,7 +245,7 @@ Status FragmentExecState::execute() {
     if (_need_wait_execution_trigger) {
         // if _need_wait_execution_trigger is true, which means this instance
         // is prepared but need to wait for the signal to do the rest execution.
-        if (!_fragments_ctx->wait_for_start()) {
+        if (!_query_ctx->wait_for_start()) {
             return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout");
         }
     }
@@ -283,7 +282,7 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const
 #ifndef BE_TEST
         // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
-        auto stream_load_ctx = _fragments_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
+        auto stream_load_ctx = _query_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
         if (stream_load_ctx != nullptr) {
             stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
         }
@@ -347,7 +346,7 @@ FragmentMgr::~FragmentMgr() {
     {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_map.clear();
-        _fragments_ctx_map.clear();
+        _query_ctx_map.clear();
     }
 }
 
@@ -529,19 +528,19 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
         exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed");
     }
 
-    std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
+    std::shared_ptr<QueryContext> query_ctx = exec_state->get_query_ctx();
     bool all_done = false;
-    if (fragments_ctx != nullptr) {
+    if (query_ctx != nullptr) {
         // decrease the number of unfinished fragments
-        all_done = fragments_ctx->countdown();
+        all_done = query_ctx->countdown();
     }
 
     // remove exec state after this fragment finished
     {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_map.erase(exec_state->fragment_instance_id());
-        if (all_done && fragments_ctx) {
-            _fragments_ctx_map.erase(fragments_ctx->query_id);
+        if (all_done && query_ctx) {
+            _query_ctx_map.erase(query_ctx->query_id);
         }
     }
 
@@ -596,8 +595,8 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r
     TUniqueId query_id;
     query_id.__set_hi(request->query_id().hi());
     query_id.__set_lo(request->query_id().lo());
-    auto search = _fragments_ctx_map.find(query_id);
-    if (search == _fragments_ctx_map.end()) {
+    auto search = _query_ctx_map.find(query_id);
+    if (search == _query_ctx_map.end()) {
         return Status::InternalError(
                 "Failed to get query fragments context. Query may be "
                 "timeout or be cancelled. host: {}",
@@ -615,47 +614,46 @@ void FragmentMgr::remove_pipeline_context(
     bool all_done = q_context->countdown();
     _pipeline_map.erase(f_context->get_fragment_instance_id());
     if (all_done) {
-        _fragments_ctx_map.erase(query_id);
+        _query_ctx_map.erase(query_id);
     }
 }
 
 template <typename Params>
 Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
-                                   std::shared_ptr<QueryFragmentsCtx>& fragments_ctx) {
+                                   std::shared_ptr<QueryContext>& query_ctx) {
     if (params.is_simplified_param) {
-        // Get common components from _fragments_ctx_map
+        // Get common components from _query_ctx_map
         std::lock_guard<std::mutex> lock(_lock);
-        auto search = _fragments_ctx_map.find(query_id);
-        if (search == _fragments_ctx_map.end()) {
+        auto search = _query_ctx_map.find(query_id);
+        if (search == _query_ctx_map.end()) {
             return Status::InternalError(
                     "Failed to get query fragments context. Query may be "
                     "timeout or be cancelled. host: {}",
                     BackendOptions::get_localhost());
         }
-        fragments_ctx = search->second;
+        query_ctx = search->second;
     } else {
         // This may be a first fragment request of the query.
         // Create the query fragments context.
-        fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
-        fragments_ctx->query_id = query_id;
-        RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
-                                              &(fragments_ctx->desc_tbl)));
-        fragments_ctx->coord_addr = params.coord;
-        LOG(INFO) << "query_id: "
-                  << UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo)
-                  << " coord_addr " << fragments_ctx->coord_addr
+        query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env);
+        query_ctx->query_id = query_id;
+        RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
+                                              &(query_ctx->desc_tbl)));
+        query_ctx->coord_addr = params.coord;
+        LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo)
+                  << " coord_addr " << query_ctx->coord_addr
                   << " total fragment num on current host: " << params.fragment_num_on_host;
-        fragments_ctx->query_globals = params.query_globals;
+        query_ctx->query_globals = params.query_globals;
 
         if (params.__isset.resource_info) {
-            fragments_ctx->user = params.resource_info.user;
-            fragments_ctx->group = params.resource_info.group;
-            fragments_ctx->set_rsc_info = true;
+            query_ctx->user = params.resource_info.user;
+            query_ctx->group = params.resource_info.group;
+            query_ctx->set_rsc_info = true;
         }
 
-        fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
-        fragments_ctx->timeout_second = params.query_options.execution_timeout;
-        _set_scan_concurrency(params, fragments_ctx.get());
+        query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
+        query_ctx->timeout_second = params.query_options.execution_timeout;
+        _set_scan_concurrency(params, query_ctx.get());
 
         bool has_query_mem_tracker =
                 params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0);
@@ -668,36 +666,36 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
             bytes_limit = MemInfo::mem_limit();
         }
         if (params.query_options.query_type == TQueryType::SELECT) {
-            fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+            query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
                     MemTrackerLimiter::Type::QUERY,
-                    fmt::format("Query#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
+                    fmt::format("Query#Id={}", print_id(query_ctx->query_id)), bytes_limit);
         } else if (params.query_options.query_type == TQueryType::LOAD) {
-            fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+            query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
                     MemTrackerLimiter::Type::LOAD,
-                    fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
+                    fmt::format("Load#Id={}", print_id(query_ctx->query_id)), bytes_limit);
         } else { // EXTERNAL
-            fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+            query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
                     MemTrackerLimiter::Type::LOAD,
-                    fmt::format("External#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
+                    fmt::format("External#Id={}", print_id(query_ctx->query_id)), bytes_limit);
         }
         if (params.query_options.__isset.is_report_success &&
             params.query_options.is_report_success) {
-            fragments_ctx->query_mem_tracker->enable_print_log_usage();
+            query_ctx->query_mem_tracker->enable_print_log_usage();
         }
 
         {
-            // Find _fragments_ctx_map again, in case some other request has already
+            // Find _query_ctx_map again, in case some other request has already
             // create the query fragments context.
             std::lock_guard<std::mutex> lock(_lock);
-            auto search = _fragments_ctx_map.find(query_id);
-            if (search == _fragments_ctx_map.end()) {
-                _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
+            auto search = _query_ctx_map.find(query_id);
+            if (search == _query_ctx_map.end()) {
+                _query_ctx_map.insert(std::make_pair(query_ctx->query_id, query_ctx));
                 LOG(INFO) << "Register query/load memory tracker, query/load id: "
-                          << print_id(fragments_ctx->query_id)
+                          << print_id(query_ctx->query_id)
                           << " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
             } else {
                 // Already has a query fragments context, use it
-                fragments_ctx = search->second;
+                query_ctx = search->second;
             }
         }
     }
@@ -726,16 +724,16 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
     }
 
     std::shared_ptr<FragmentExecState> exec_state;
-    std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
+    std::shared_ptr<QueryContext> query_ctx;
     bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine &&
                                    params.query_options.enable_pipeline_engine;
     RETURN_IF_ERROR(
-            _get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, fragments_ctx));
-    fragments_ctx->fragment_ids.push_back(fragment_instance_id);
+            _get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, query_ctx));
+    query_ctx->fragment_ids.push_back(fragment_instance_id);
 
     exec_state.reset(
-            new FragmentExecState(fragments_ctx->query_id, params.params.fragment_instance_id,
-                                  params.backend_num, _exec_env, fragments_ctx,
+            new FragmentExecState(query_ctx->query_id, params.params.fragment_instance_id,
+                                  params.backend_num, _exec_env, query_ctx,
                                   std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback),
                                                   this, std::placeholders::_1)));
     if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
@@ -794,8 +792,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
     START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
 
     std::shared_ptr<FragmentExecState> exec_state;
-    std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
-    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, fragments_ctx));
+    std::shared_ptr<QueryContext> query_ctx;
+    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
 
     if (params.__isset.resource_groups && !params.resource_groups.empty()) {
         taskgroup::TaskGroupInfo task_group_info;
@@ -805,12 +803,12 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
             auto tg = taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
                     task_group_info);
             _exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info, tg);
-            fragments_ctx->set_task_group(tg);
-            LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id)
+            query_ctx->set_task_group(tg);
+            LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id)
                       << " use task group: " << tg->debug_string();
         }
     } else {
-        VLOG_DEBUG << "Query/load id: " << print_id(fragments_ctx->query_id)
+        VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id)
                    << " does not use task group.";
     }
 
@@ -827,11 +825,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
             }
         }
 
-        fragments_ctx->fragment_ids.push_back(fragment_instance_id);
+        query_ctx->fragment_ids.push_back(fragment_instance_id);
 
         exec_state.reset(new FragmentExecState(
-                fragments_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env,
-                fragments_ctx,
+                query_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env,
+                query_ctx,
                 std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
                                 std::placeholders::_1)));
         if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
@@ -842,14 +840,14 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
 
         int64_t duration_ns = 0;
         if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
-            fragments_ctx->set_ready_to_execute_only();
+            query_ctx->set_ready_to_execute_only();
         }
         _setup_shared_hashtable_for_broadcast_join(
-                params, local_params, exec_state->executor()->runtime_state(), fragments_ctx.get());
+                params, local_params, exec_state->executor()->runtime_state(), query_ctx.get());
         std::shared_ptr<pipeline::PipelineFragmentContext> context =
                 std::make_shared<pipeline::PipelineFragmentContext>(
-                        fragments_ctx->query_id, fragment_instance_id, params.fragment_id,
-                        local_params.backend_num, fragments_ctx, _exec_env, cb,
+                        query_ctx->query_id, fragment_instance_id, params.fragment_id,
+                        local_params.backend_num, query_ctx, _exec_env, cb,
                         std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
                                         std::placeholders::_1));
         {
@@ -879,13 +877,13 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
 }
 
 template <typename Param>
-void FragmentMgr::_set_scan_concurrency(const Param& params, QueryFragmentsCtx* fragments_ctx) {
+void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query_ctx) {
 #ifndef BE_TEST
     // If the token is set, the scan task will use limited_scan_pool in scanner scheduler.
     // Otherwise, the scan task will use local/remote scan pool in scanner scheduler
     if (params.query_options.__isset.resource_limit &&
         params.query_options.resource_limit.__isset.cpu_limit) {
-        fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, false);
+        query_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, false);
     }
 #endif
 }
@@ -923,8 +921,8 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan
     std::vector<TUniqueId> cancel_fragment_ids;
     {
         std::lock_guard<std::mutex> lock(_lock);
-        auto ctx = _fragments_ctx_map.find(query_id);
-        if (ctx != _fragments_ctx_map.end()) {
+        auto ctx = _query_ctx_map.find(query_id);
+        if (ctx != _query_ctx_map.end()) {
             cancel_fragment_ids = ctx->second->fragment_ids;
         }
     }
@@ -935,8 +933,8 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan
 
 bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
     std::lock_guard<std::mutex> lock(_lock);
-    auto ctx = _fragments_ctx_map.find(query_id);
-    if (ctx != _fragments_ctx_map.end()) {
+    auto ctx = _query_ctx_map.find(query_id);
+    if (ctx != _query_ctx_map.end()) {
         for (auto it : ctx->second->fragment_ids) {
             auto exec_state_iter = _fragment_map.find(it);
             if (exec_state_iter != _fragment_map.end() && exec_state_iter->second) {
@@ -965,9 +963,9 @@ void FragmentMgr::cancel_worker() {
                     to_cancel.push_back(it.second->fragment_instance_id());
                 }
             }
-            for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) {
+            for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
-                    it = _fragments_ctx_map.erase(it);
+                    it = _query_ctx_map.erase(it);
                 } else {
                     ++it;
                 }
@@ -1192,7 +1190,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
 
 void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
                                                              RuntimeState* state,
-                                                             QueryFragmentsCtx* fragments_ctx) {
+                                                             QueryContext* query_ctx) {
     if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
         !params.query_options.enable_share_hash_table_for_broadcast_join) {
         return;
@@ -1210,7 +1208,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFrag
         }
 
         if (params.build_hash_table_for_broadcast_join) {
-            fragments_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
+            query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
                     params.params.fragment_instance_id, params.instances_sharing_hash_table,
                     node.node_id);
         }
@@ -1219,7 +1217,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFrag
 
 void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
         const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params,
-        RuntimeState* state, QueryFragmentsCtx* fragments_ctx) {
+        RuntimeState* state, QueryContext* query_ctx) {
     if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
         !params.query_options.enable_share_hash_table_for_broadcast_join) {
         return;
@@ -1237,7 +1235,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
         }
 
         if (local_params.build_hash_table_for_broadcast_join) {
-            fragments_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
+            query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
                     local_params.fragment_instance_id, params.instances_sharing_hash_table,
                     node.node_id);
         }
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index d5a724787e..2cc7fcc251 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -47,7 +47,7 @@ namespace doris {
 namespace pipeline {
 class PipelineFragmentContext;
 }
-class QueryFragmentsCtx;
+class QueryContext;
 class ExecEnv;
 class FragmentExecState;
 class ThreadPool;
@@ -139,20 +139,18 @@ private:
     void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const FinishCallback& cb);
 
     template <typename Param>
-    void _set_scan_concurrency(const Param& params, QueryFragmentsCtx* fragments_ctx);
+    void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
 
     void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
-                                                    RuntimeState* state,
-                                                    QueryFragmentsCtx* fragments_ctx);
+                                                    RuntimeState* state, QueryContext* query_ctx);
 
     void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params,
                                                     const TPipelineInstanceParams& local_params,
-                                                    RuntimeState* state,
-                                                    QueryFragmentsCtx* fragments_ctx);
+                                                    RuntimeState* state, QueryContext* query_ctx);
 
     template <typename Params>
     Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
-                          std::shared_ptr<QueryFragmentsCtx>& fragments_ctx);
+                          std::shared_ptr<QueryContext>& query_ctx);
 
     // This is input params
     ExecEnv* _exec_env;
@@ -166,8 +164,8 @@ private:
 
     std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
 
-    // query id -> QueryFragmentsCtx
-    std::unordered_map<TUniqueId, std::shared_ptr<QueryFragmentsCtx>> _fragments_ctx_map;
+    // query id -> QueryContext
+    std::unordered_map<TUniqueId, std::shared_ptr<QueryContext>> _query_ctx_map;
     std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
 
     CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 10747ea7d3..ba1db4b69d 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -45,7 +45,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/query_statistics.h"
 #include "runtime/result_queue_mgr.h"
 #include "runtime/runtime_filter_mgr.h"
@@ -94,7 +94,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
 }
 
 Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
-                                     QueryFragmentsCtx* fragments_ctx) {
+                                     QueryContext* query_ctx) {
     OpentelemetryTracer tracer = telemetry::get_noop_tracer();
     if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
         tracer = telemetry::get_tracer(print_id(_query_id));
@@ -112,12 +112,11 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
     // VLOG_CRITICAL << "request:\n" << apache::thrift::ThriftDebugString(request);
 
     const TQueryGlobals& query_globals =
-            fragments_ctx == nullptr ? request.query_globals : fragments_ctx->query_globals;
+            query_ctx == nullptr ? request.query_globals : query_ctx->query_globals;
     _runtime_state.reset(new RuntimeState(params, request.query_options, query_globals, _exec_env));
-    _runtime_state->set_query_fragments_ctx(fragments_ctx);
-    _runtime_state->set_query_mem_tracker(fragments_ctx == nullptr
-                                                  ? _exec_env->orphan_mem_tracker()
-                                                  : fragments_ctx->query_mem_tracker);
+    _runtime_state->set_query_ctx(query_ctx);
+    _runtime_state->set_query_mem_tracker(query_ctx == nullptr ? _exec_env->orphan_mem_tracker()
+                                                               : query_ctx->query_mem_tracker);
     _runtime_state->set_tracer(std::move(tracer));
 
     SCOPED_ATTACH_TASK(_runtime_state.get());
@@ -142,8 +141,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
 
     // set up desc tbl
     DescriptorTbl* desc_tbl = nullptr;
-    if (fragments_ctx != nullptr) {
-        desc_tbl = fragments_ctx->desc_tbl;
+    if (query_ctx != nullptr) {
+        desc_tbl = query_ctx->desc_tbl;
     } else {
         DCHECK(request.__isset.desc_tbl);
         RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl, &desc_tbl));
@@ -487,7 +486,7 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const
     _cancel_msg = msg;
     _runtime_state->set_is_cancelled(true);
     // To notify wait_for_start()
-    _runtime_state->get_query_fragments_ctx()->set_ready_to_execute(true);
+    _runtime_state->get_query_ctx()->set_ready_to_execute(true);
 
     // must close stream_mgr to avoid dead lock in Exchange Node
     auto env = _runtime_state->exec_env();
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index 61f03b624d..bc58a861d1 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -39,7 +39,7 @@
 
 namespace doris {
 
-class QueryFragmentsCtx;
+class QueryContext;
 class ExecNode;
 class RowDescriptor;
 class DataSink;
@@ -98,9 +98,8 @@ public:
     // If request.query_options.mem_limit > 0, it is used as an approximate limit on the
     // number of bytes this query can consume at runtime.
     // The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
-    // If fragments_ctx is not null, some components will be got from fragments_ctx.
-    Status prepare(const TExecPlanFragmentParams& request,
-                   QueryFragmentsCtx* fragments_ctx = nullptr);
+    // If query_ctx is not null, some components will be got from query_ctx.
+    Status prepare(const TExecPlanFragmentParams& request, QueryContext* query_ctx = nullptr);
 
     // Start execution. Call this prior to get_next().
     // If this fragment has a sink, open() will send all rows produced
diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_context.h
similarity index 92%
rename from be/src/runtime/query_fragments_ctx.h
rename to be/src/runtime/query_context.h
index a5ff251dce..b941f0fe86 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_context.h
@@ -25,6 +25,7 @@
 #include <string>
 
 #include "common/config.h"
+#include "common/factory_creator.h"
 #include "common/object_pool.h"
 #include "runtime/datetime_value.h"
 #include "runtime/exec_env.h"
@@ -43,18 +44,20 @@ namespace doris {
 // Some components like DescriptorTbl may be very large
 // that will slow down each execution of fragments when DeSer them every time.
 class DescriptorTbl;
-class QueryFragmentsCtx {
+class QueryContext {
+    ENABLE_FACTORY_CREATOR(QueryContext);
+
 public:
-    QueryFragmentsCtx(int total_fragment_num, ExecEnv* exec_env)
+    QueryContext(int total_fragment_num, ExecEnv* exec_env)
             : fragment_num(total_fragment_num), timeout_second(-1), _exec_env(exec_env) {
         _start_time = vectorized::VecDateTimeValue::local_time();
         _shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
         _shared_scanner_controller.reset(new vectorized::SharedScannerController());
     }
 
-    ~QueryFragmentsCtx() {
-        // query mem tracker consumption is equal to 0, it means that after QueryFragmentsCtx is created,
-        // it is found that query already exists in _fragments_ctx_map, and query mem tracker is not used.
+    ~QueryContext() {
+        // query mem tracker consumption is equal to 0, it means that after QueryContext is created,
+        // it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
         // query mem tracker consumption is not equal to 0 after use, because there is memory consumed
         // on query mem tracker, released on other trackers.
         if (query_mem_tracker->consumption() != 0) {
@@ -146,12 +149,12 @@ public:
     TQueryGlobals query_globals;
 
     /// In the current implementation, for multiple fragments executed by a query on the same BE node,
-    /// we store some common components in QueryFragmentsCtx, and save QueryFragmentsCtx in FragmentMgr.
-    /// When all Fragments are executed, QueryFragmentsCtx needs to be deleted from FragmentMgr.
+    /// we store some common components in QueryContext, and save QueryContext in FragmentMgr.
+    /// When all Fragments are executed, QueryContext needs to be deleted from FragmentMgr.
     /// Here we use a counter to store the number of Fragments that have not yet been completed,
     /// and after each Fragment is completed, this value will be reduced by one.
     /// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment
-    /// will clean up QueryFragmentsCtx.
+    /// will clean up QueryContext.
     std::atomic<int> fragment_num;
     int timeout_second;
     ObjectPool obj_pool;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index bb29113529..82f7263ae2 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -48,7 +48,7 @@ class ObjectPool;
 class ExecEnv;
 class RuntimeFilterMgr;
 class MemTrackerLimiter;
-class QueryFragmentsCtx;
+class QueryContext;
 
 // A collection of items that are part of the global state of a
 // query and shared across all execution nodes of that query.
@@ -340,9 +340,9 @@ public:
 
     RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
 
-    void set_query_fragments_ctx(QueryFragmentsCtx* ctx) { _query_ctx = ctx; }
+    void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; }
 
-    QueryFragmentsCtx* get_query_fragments_ctx() { return _query_ctx; }
+    QueryContext* get_query_ctx() { return _query_ctx; }
 
     void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) {
         _query_mem_tracker = tracker;
@@ -476,7 +476,7 @@ private:
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
     std::vector<TErrorTabletInfo> _error_tablet_infos;
 
-    QueryFragmentsCtx* _query_ctx;
+    QueryContext* _query_ctx;
 
     // true if max_filter_ratio is 0
     bool _load_zero_tolerance = false;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index f6bfe66525..f1cba5c0c7 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -44,7 +44,7 @@
 #include "gutil/strings/substitute.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
@@ -452,7 +452,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
         if (state->enable_share_hash_table_for_broadcast_join()) {
             runtime_profile()->add_info_string("ShareHashTableEnabled", "true");
             _shared_hashtable_controller =
-                    state->get_query_fragments_ctx()->get_shared_hash_table_controller();
+                    state->get_query_ctx()->get_shared_hash_table_controller();
             _shared_hash_table_context = _shared_hashtable_controller->get_context(id());
             _should_build_hash_table = _shared_hashtable_controller->should_build_hash_table(
                     state->fragment_instance_id(), id());
diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp
index aabd95f1e6..9ed5084b41 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -30,7 +30,7 @@
 #include "common/config.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "util/pretty_printer.h"
 #include "util/uid_util.h"
@@ -112,7 +112,7 @@ Status ScannerContext::init() {
 
 #ifndef BE_TEST
     // 3. get thread token
-    thread_token = _state->get_query_fragments_ctx()->get_token();
+    thread_token = _state->get_query_ctx()->get_token();
 #endif
 
     // 4. This ctx will be submitted to the scanner scheduler right after init.
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 590f584e92..fb2ade6485 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -125,8 +125,7 @@ Status VScanNode::prepare(RuntimeState* state) {
 
     if (_is_pipeline_scan) {
         if (_shared_scan_opt) {
-            _shared_scanner_controller =
-                    state->get_query_fragments_ctx()->get_shared_scanner_controller();
+            _shared_scanner_controller = state->get_query_ctx()->get_shared_scanner_controller();
             auto [should_create_scanner, queue_id] =
                     _shared_scanner_controller->should_build_scanner_and_queue_id(id());
             _should_create_scanner = should_create_scanner;
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 294ccf6437..231ce9fb19 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -39,7 +39,7 @@
 #include "exec/olap_common.h"
 #include "exprs/function_filter.h"
 #include "runtime/define_primitive_type.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "util/lock.h"
 #include "util/runtime_profile.h"
@@ -119,7 +119,7 @@ public:
         if (_is_pipeline_scan) {
             if (_shared_scan_opt) {
                 _shared_scanner_controller =
-                        state->get_query_fragments_ctx()->get_shared_scanner_controller();
+                        state->get_query_ctx()->get_shared_scanner_controller();
                 auto [should_create_scanner, queue_id] =
                         _shared_scanner_controller->should_build_scanner_and_queue_id(id());
                 _should_create_scanner = should_create_scanner;
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4901414d95..6fb7f36b2a 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -31,7 +31,7 @@
 #include "common/status.h"
 #include "runtime/descriptors.h"
 #include "runtime/memory/mem_tracker.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
@@ -82,7 +82,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
     // init runtime predicate
     _use_topn_opt = tnode.sort_node.use_topn_opt;
     if (_use_topn_opt) {
-        auto query_ctx = state->get_query_fragments_ctx();
+        auto query_ctx = state->get_query_ctx();
         auto first_sort_expr_node = tnode.sort_node.sort_info.ordering_exprs[0].nodes[0];
         if (first_sort_expr_node.node_type == TExprNodeType::SLOT_REF) {
             auto first_sort_slot = first_sort_expr_node.slot_ref;
@@ -143,7 +143,7 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool
                 auto& sort_description = _sorter->get_sort_description();
                 auto col = input_block->get_by_position(sort_description[0].column_number);
                 bool is_reverse = sort_description[0].direction < 0;
-                auto query_ctx = state->get_query_fragments_ctx();
+                auto query_ctx = state->get_query_ctx();
                 RETURN_IF_ERROR(
                         query_ctx->get_runtime_predicate().update(new_top, col.name, is_reverse));
                 old_top = std::move(new_top);
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index 82bd608810..1045621c9c 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -36,7 +36,7 @@
 #include "olap/rowset/rowset_meta.h"
 #include "olap/tablet.h"
 #include "olap/tablet_schema.h"
-#include "runtime/query_fragments_ctx.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
 #include "vec/columns/column.h"
@@ -391,7 +391,7 @@ Status VCollectIterator::_topn_next(Block* block) {
                 col_ptr->get(last_sorted_row, new_top);
 
                 // update orderby_extrems in query global context
-                auto query_ctx = _reader->_reader_context.runtime_state->get_query_fragments_ctx();
+                auto query_ctx = _reader->_reader_context.runtime_state->get_query_ctx();
                 RETURN_IF_ERROR(
                         query_ctx->get_runtime_predicate().update(new_top, col_name, _is_reverse));
             }
diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp
index 323628c44d..1d6e09eccc 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -36,7 +36,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
 PlanFragmentExecutor::~PlanFragmentExecutor() {}
 
 Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
-                                     QueryFragmentsCtx* batch_ctx) {
+                                     QueryContext* batch_ctx) {
     return s_prepare_status;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org