You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/09 07:50:47 UTC

[doris] branch master updated: [feature](tracing) Support query tracing to improve doris observability by introducing OpenTelemetry. (#10533)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d5ea677282 [feature](tracing) Support query tracing to improve doris observability by introducing OpenTelemetry. (#10533)
d5ea677282 is described below

commit d5ea6772829a6cfa8ee46c9c49103e8d036df7b9
Author: luozenglin <37...@users.noreply.github.com>
AuthorDate: Sat Jul 9 15:50:40 2022 +0800

    [feature](tracing) Support query tracing to improve doris observability by introducing OpenTelemetry. (#10533)
    
    The collection of query traces is implemented in fe and be, and the spans are exported to zipkin.
    DSIP: https://cwiki.apache.org/confluence/display/DORIS/DSIP-012%3A+Introduce+opentelemetry
---
 be/CMakeLists.txt                                  |  28 +++++
 be/src/common/config.h                             |  15 +++
 be/src/common/status.h                             |  13 +++
 be/src/exec/data_sink.h                            |   9 ++
 be/src/exec/exec_node.cpp                          |   1 +
 be/src/exec/exec_node.h                            |  11 ++
 be/src/exec/olap_scanner.cpp                       |   2 +
 be/src/exec/table_function_node.cpp                |   2 +
 be/src/runtime/fragment_mgr.cpp                    |  23 +++-
 be/src/runtime/plan_fragment_executor.cpp          |  53 +++++----
 be/src/runtime/runtime_state.h                     |   7 ++
 be/src/service/doris_main.cpp                      |   3 +
 be/src/service/internal_service.cpp                |   8 ++
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/runtime_profile.cpp                    |   2 +-
 be/src/util/runtime_profile.h                      |   8 +-
 .../telemetry/brpc_carrier.cpp}                    |  22 ++--
 be/src/util/telemetry/brpc_carrier.h               |  64 +++++++++++
 be/src/util/telemetry/telemetry.cpp                |  70 ++++++++++++
 be/src/util/telemetry/telemetry.h                  | 100 +++++++++++++++++
 be/src/vec/exec/join/vhash_join_node.cpp           |  17 ++-
 be/src/vec/exec/vaggregation_node.cpp              |  10 +-
 be/src/vec/exec/vanalytic_eval_node.cpp            |   7 +-
 be/src/vec/exec/vassert_num_rows_node.cpp          |   6 +-
 be/src/vec/exec/vblocking_join_node.cpp            |   5 +-
 be/src/vec/exec/vbroker_scan_node.cpp              |  10 +-
 be/src/vec/exec/vcross_join_node.cpp               |   9 +-
 be/src/vec/exec/vempty_set_node.cpp                |   1 +
 be/src/vec/exec/ves_http_scan_node.cpp             |  11 +-
 be/src/vec/exec/vexcept_node.cpp                   |   3 +
 be/src/vec/exec/vexchange_node.cpp                 |   3 +
 be/src/vec/exec/vintersect_node.cpp                |   3 +
 be/src/vec/exec/vmysql_scan_node.cpp               |   3 +
 be/src/vec/exec/vodbc_scan_node.cpp                |   3 +
 be/src/vec/exec/volap_scan_node.cpp                |  23 +++-
 be/src/vec/exec/vrepeat_node.cpp                   |   7 +-
 be/src/vec/exec/vschema_scan_node.cpp              |   5 +
 be/src/vec/exec/vselect_node.cpp                   |   6 +-
 be/src/vec/exec/vset_operation_node.cpp            |   8 +-
 be/src/vec/exec/vsort_node.cpp                     |   6 +-
 be/src/vec/exec/vtable_function_node.cpp           |   6 +-
 be/src/vec/exec/vunion_node.cpp                    |  10 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |   3 +
 be/src/vec/sink/vmysql_table_sink.cpp              |   3 +
 be/src/vec/sink/vresult_file_sink.cpp              |   3 +
 be/src/vec/sink/vresult_sink.cpp                   |   3 +
 be/src/vec/sink/vtablet_sink.cpp                   |   3 +
 docs/.vuepress/sidebar/en/docs.js                  |   1 +
 docs/.vuepress/sidebar/zh-CN/docs.js               |   1 +
 docs/en/docs/admin-manual/tracing.md               |  80 +++++++++++++
 docs/zh-CN/docs/admin-manual/tracing.md            |  82 ++++++++++++++
 fe/fe-core/pom.xml                                 |  26 +++++
 .../src/main/java/org/apache/doris/PaloFe.java     |   3 +
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../apache/doris/common/telemetry/ScopedSpan.java  |  38 +++++--
 .../apache/doris/common/telemetry/Telemetry.java   | 101 +++++++++++++++++
 .../java/org/apache/doris/qe/ConnectContext.java   |  12 ++
 .../java/org/apache/doris/qe/ConnectProcessor.java |  13 ++-
 .../main/java/org/apache/doris/qe/Coordinator.java |  69 +++++++++---
 .../java/org/apache/doris/qe/StmtExecutor.java     | 125 ++++++++++++++-------
 .../org/apache/doris/rpc/BackendServiceClient.java |  32 +++++-
 61 files changed, 1079 insertions(+), 130 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index aa894e19a5..f638046a55 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -328,6 +328,27 @@ set_target_properties(minizip PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib
 add_library(idn STATIC IMPORTED)
 set_target_properties(idn PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libidn.a)
 
+add_library(opentelemetry_common STATIC IMPORTED)
+set_target_properties(opentelemetry_common PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_common.a)
+
+add_library(opentelemetry_exporter_zipkin_trace STATIC IMPORTED)
+set_target_properties(opentelemetry_exporter_zipkin_trace PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_exporter_zipkin_trace.a)
+
+add_library(opentelemetry_resources STATIC IMPORTED)
+set_target_properties(opentelemetry_resources PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_resources.a)
+
+add_library(opentelemetry_version STATIC IMPORTED)
+set_target_properties(opentelemetry_version PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_version.a)
+
+add_library(opentelemetry_exporter_ostream_span STATIC IMPORTED)
+set_target_properties(opentelemetry_exporter_ostream_span PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_exporter_ostream_span.a)
+
+add_library(opentelemetry_trace STATIC IMPORTED)
+set_target_properties(opentelemetry_trace PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_trace.a)
+
+add_library(opentelemetry_http_client_curl STATIC IMPORTED)
+set_target_properties(opentelemetry_http_client_curl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_http_client_curl.a)
+
 add_library(xml2 STATIC IMPORTED)
 set_target_properties(xml2 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libxml2.a)
 
@@ -633,6 +654,13 @@ set(COMMON_THIRDPARTY
     odbc
     cctz
     minizip
+    opentelemetry_common
+    opentelemetry_exporter_zipkin_trace
+    opentelemetry_resources
+    opentelemetry_version
+    opentelemetry_exporter_ostream_span
+    opentelemetry_trace
+    opentelemetry_http_client_curl
     ${AWS_LIBS}
     # put this after lz4 to avoid using lz4 lib in librdkafka
     librdkafka_cpp
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ca0c66654d..d212617867 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -739,6 +739,21 @@ CONF_String(function_service_protocol, "h2:grpc");
 // use which load balancer to select server to connect
 CONF_String(rpc_load_balancer, "rr");
 
+CONF_Bool(enable_tracing, "false");
+
+// The endpoint to export spans to.
+CONF_String(trace_export_url, "http://127.0.0.1:9411/api/v2/spans");
+
+// The maximum buffer/queue size to collect span. After the size is reached, spans are dropped.
+// An export will be triggered when the number of spans in the queue reaches half of the maximum.
+CONF_Int32(max_span_queue_size, "2048");
+
+// The maximum batch size of every export spans. It must be smaller or equal to max_queue_size.
+CONF_Int32(max_span_export_batch_size, "512");
+
+// The time interval between two consecutive export spans.
+CONF_Int32(export_span_schedule_delay_millis, "500");
+
 // a soft limit of string type length, the hard limit is 2GB - 4, but if too long will cause very low performance,
 // so we set a soft limit, default is 1MB
 CONF_mInt32(string_type_length_soft_limit_bytes, "1048576");
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 68824df80a..830d417a7c 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -594,6 +594,19 @@ inline std::ostream& operator<<(std::ostream& ostr, const Status& param) {
         }                                \
     } while (false)
 
+// End _get_next_span after last call to get_next method
+#define RETURN_IF_ERROR_AND_CHECK_SPAN(stmt, get_next_span, done) \
+    do {                                                          \
+        const auto& _status_ = (stmt);                            \
+        auto _span = (get_next_span);                             \
+        if (UNLIKELY(_span && (!_status_.ok() || done))) {        \
+            _span->End();                                         \
+        }                                                         \
+        if (UNLIKELY(!_status_.ok())) {                           \
+            return _status_;                                      \
+        }                                                         \
+    } while (false)
+
 #define RETURN_IF_STATUS_ERROR(status, stmt) \
     do {                                     \
         status = (stmt);                     \
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 608c240d7c..e079877c2e 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -28,6 +28,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/query_statistics.h"
+#include "util/telemetry/telemetry.h"
 
 namespace doris {
 
@@ -90,6 +91,12 @@ public:
         _query_statistics = statistics;
     }
 
+    void end_send_span() {
+        if (_send_span) {
+            _send_span->End();
+        }
+    }
+
 protected:
     // Set to true after close() has been called. subclasses should check and set this in
     // close().
@@ -99,6 +106,8 @@ protected:
 
     // Maybe this will be transferred to BufferControlBlock.
     std::shared_ptr<QueryStatistics> _query_statistics;
+
+    OpentelemetrySpan _send_span {};
 };
 
 } // namespace doris
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 5da036505a..7f6bdf886a 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -149,6 +149,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
           _rows_returned_counter(nullptr),
           _rows_returned_rate(nullptr),
           _memory_used_counter(nullptr),
+          _get_next_span(),
           _is_closed(false) {}
 
 ExecNode::~ExecNode() {}
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index fe3ab8b8e3..1f386ac1d4 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -32,6 +32,7 @@
 #include "service/backend_options.h"
 #include "util/blocking_queue.hpp"
 #include "util/runtime_profile.h"
+#include "util/telemetry/telemetry.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris {
@@ -191,6 +192,8 @@ public:
 
     std::shared_ptr<MemTracker> expr_mem_tracker() const { return _expr_mem_tracker; }
 
+    OpentelemetrySpan get_next_span() { return _get_next_span; }
+
     // Extract node id from p->name().
     static int get_node_id_from_profile(RuntimeProfile* p);
 
@@ -297,6 +300,14 @@ protected:
     // Account for peak memory used by this node
     RuntimeProfile::Counter* _memory_used_counter;
 
+    /// Since get_next is a frequent operation, it is not necessary to generate a span for each call
+    /// to the get_next method. Therefore, the call of the get_next method in the ExecNode is
+    /// merged into this _get_next_span. The _get_next_span is initialized by
+    /// INIT_AND_SCOPE_GET_NEXT_SPAN when the get_next method is called for the first time
+    /// (recording the start timestamp), and is ended by RETURN_IF_ERROR_AND_CHECK_SPAN after the
+    /// last call to the get_next method (the record is terminated timestamp).
+    OpentelemetrySpan _get_next_span;
+
     // Execution options that are determined at runtime.  This is added to the
     // runtime profile at close().  Examples for options logged here would be
     // "Codegen Enabled"
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index e0e6a68549..29436ecf05 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -122,6 +122,8 @@ Status OlapScanner::prepare(
 }
 
 Status OlapScanner::open() {
+    auto span = _runtime_state->get_tracer()->StartSpan("OlapScanner::open");
+    auto scope = opentelemetry::trace::Scope {span};
     SCOPED_TIMER(_parent->_reader_init_timer);
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
diff --git a/be/src/exec/table_function_node.cpp b/be/src/exec/table_function_node.cpp
index e95914bd22..984ebb0266 100644
--- a/be/src/exec/table_function_node.cpp
+++ b/be/src/exec/table_function_node.cpp
@@ -102,6 +102,7 @@ Status TableFunctionNode::prepare(RuntimeState* state) {
 }
 
 Status TableFunctionNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "TableFunctionNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
@@ -374,6 +375,7 @@ Status TableFunctionNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "TableFunctionNode::close");
     Expr::close(_fn_ctxs, state);
     vectorized::VExpr::close(_vfn_ctxs, state);
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 29a5087635..f7faec86a9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -34,6 +34,7 @@
 #include "gen_cpp/QueryPlanExtra_types.h"
 #include "gen_cpp/Types_types.h"
 #include "gutil/strings/substitute.h"
+#include "opentelemetry/trace/scope.h"
 #include "runtime/client_cache.h"
 #include "runtime/datetime_value.h"
 #include "runtime/descriptors.h"
@@ -48,6 +49,7 @@
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "util/stopwatch.hpp"
+#include "util/telemetry/telemetry.h"
 #include "util/threadpool.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
@@ -234,6 +236,7 @@ Status FragmentExecState::execute() {
         // if _need_wait_execution_trigger is true, which means this instance
         // is prepared but need to wait for the signal to do the rest execution.
         _fragments_ctx->wait_for_start();
+        opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment");
     }
     int64_t duration_ns = 0;
     {
@@ -471,8 +474,18 @@ FragmentMgr::~FragmentMgr() {
 static void empty_function(PlanFragmentExecutor* exec) {}
 
 void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb) {
+    std::string func_name {"PlanFragmentExecutor::_exec_actual"};
+#ifndef BE_TEST
+    auto span = exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name);
+#else
+    auto span = telemetry::get_noop_tracer()->StartSpan(func_name);
+#endif
+    auto scope = opentelemetry::trace::Scope {span};
+    span->SetAttribute("query_id", print_id(exec_state->query_id()));
+    span->SetAttribute("instance_id", print_id(exec_state->fragment_instance_id()));
+
     TAG(LOG(INFO))
-            .log("PlanFragmentExecutor::_exec_actual")
+            .log(std::move(func_name))
             .query_id(exec_state->query_id())
             .instance_id(exec_state->fragment_instance_id())
             .tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
@@ -577,6 +590,9 @@ std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_
 }
 
 Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
+    auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer")
+                                                     : telemetry::get_noop_tracer();
+    START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
     const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
     {
         std::lock_guard<std::mutex> lock(_lock);
@@ -658,7 +674,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
     }
 
     auto st = _thread_pool->submit_func(
-            std::bind<void>(&FragmentMgr::_exec_actual, this, exec_state, cb));
+            [this, exec_state, cb, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
+                OpentelemetryScope scope {parent_span};
+                _exec_actual(exec_state, cb);
+            });
     if (!st.ok()) {
         {
             // Remove the exec state added
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 908dba4057..cdfe8e0a2a 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -38,10 +38,12 @@
 #include "runtime/row_batch.h"
 #include "runtime/thread_context.h"
 #include "util/container_util.hpp"
+#include "util/defer_op.h"
 #include "util/logging.h"
 #include "util/mem_info.h"
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
+#include "util/telemetry/telemetry.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
 #include "vec/exec/vexchange_node.h"
@@ -72,6 +74,12 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
 
 Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
                                      QueryFragmentsCtx* fragments_ctx) {
+    OpentelemetryTracer tracer = telemetry::get_noop_tracer();
+    if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
+        tracer = telemetry::get_tracer(print_id(_query_id));
+    }
+    START_AND_SCOPE_SPAN(tracer, span, "PlanFragmentExecutor::prepare");
+
     const TPlanFragmentExecParams& params = request.params;
     _query_id = params.query_id;
 
@@ -87,6 +95,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
             fragments_ctx == nullptr ? request.query_globals : fragments_ctx->query_globals;
     _runtime_state.reset(new RuntimeState(params, request.query_options, query_globals, _exec_env));
     _runtime_state->set_query_fragments_ctx(fragments_ctx);
+    _runtime_state->set_tracer(std::move(tracer));
 
     RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
     SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), _runtime_state->instance_mem_tracker());
@@ -262,30 +271,33 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         RETURN_IF_ERROR(_sink->open(runtime_state()));
     }
 
-    while (true) {
-        doris::vectorized::Block* block;
+    {
+        auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }};
+        while (true) {
+            doris::vectorized::Block* block;
 
-        {
-            SCOPED_CPU_TIMER(_fragment_cpu_timer);
-            RETURN_IF_ERROR(get_vectorized_internal(&block));
-        }
+            {
+                SCOPED_CPU_TIMER(_fragment_cpu_timer);
+                RETURN_IF_ERROR(get_vectorized_internal(&block));
+            }
 
-        if (block == NULL) {
-            break;
-        }
+            if (block == NULL) {
+                break;
+            }
 
-        SCOPED_TIMER(profile()->total_time_counter());
-        SCOPED_CPU_TIMER(_fragment_cpu_timer);
-        // Collect this plan and sub plan statistics, and send to parent plan.
-        if (_collect_query_statistics_with_every_batch) {
-            _collect_query_statistics();
-        }
+            SCOPED_TIMER(profile()->total_time_counter());
+            SCOPED_CPU_TIMER(_fragment_cpu_timer);
+            // Collect this plan and sub plan statistics, and send to parent plan.
+            if (_collect_query_statistics_with_every_batch) {
+                _collect_query_statistics();
+            }
 
-        auto st = _sink->send(runtime_state(), block);
-        if (st.is_end_of_file()) {
-            break;
+            auto st = _sink->send(runtime_state(), block);
+            if (st.is_end_of_file()) {
+                break;
+            }
+            RETURN_IF_ERROR(st);
         }
-        RETURN_IF_ERROR(st);
     }
 
     {
@@ -318,7 +330,8 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
     while (!_done) {
         _block->clear_column_data(_plan->row_desc().num_materialized_slots());
         SCOPED_TIMER(profile()->total_time_counter());
-        RETURN_IF_ERROR(_plan->get_next(_runtime_state.get(), _block.get(), &_done));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(_plan->get_next(_runtime_state.get(), _block.get(), &_done),
+                                       _plan->get_next_span(), _done);
 
         if (_block->rows() > 0) {
             COUNTER_UPDATE(_rows_produced_counter, _block->rows());
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ba07ecbf38..719139e994 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -38,6 +38,7 @@
 #include "runtime/thread_resource_mgr.h"
 #include "util/logging.h"
 #include "util/runtime_profile.h"
+#include "util/telemetry/telemetry.h"
 
 namespace doris {
 
@@ -373,6 +374,10 @@ public:
 
     QueryFragmentsCtx* get_query_fragments_ctx() { return _query_ctx; }
 
+    OpentelemetryTracer get_tracer() { return _tracer; }
+
+    void set_tracer(OpentelemetryTracer&& tracer) { _tracer = std::move(tracer); }
+
 private:
     // Use a custom block manager for the query for testing purposes.
     void set_block_mgr2(const std::shared_ptr<BufferedBlockMgr2>& block_mgr) {
@@ -521,6 +526,8 @@ private:
     // true if max_filter_ratio is 0
     bool _load_zero_tolerance = false;
 
+    OpentelemetryTracer _tracer = telemetry::get_noop_tracer();
+
     // prohibit copies
     RuntimeState(const RuntimeState&);
 };
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 00d0900af8..57743b2731 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -58,6 +58,7 @@
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "util/logging.h"
+#include "util/telemetry/telemetry.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/thrift_server.h"
 #include "util/uid_util.h"
@@ -403,6 +404,8 @@ int main(int argc, char** argv) {
     // SHOULD be called after exec env is initialized.
     EXIT_IF_ERROR(engine->start_bg_threads());
 
+    doris::telemetry::initTracer();
+
     // begin to start services
     doris::ThriftRpcHelper::setup(exec_env);
     // 1. thrift server with be_port
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 47e156a0bf..38885c68e2 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -35,6 +35,8 @@
 #include "util/md5.h"
 #include "util/proto_util.h"
 #include "util/string_util.h"
+#include "util/telemetry/brpc_carrier.h"
+#include "util/telemetry/telemetry.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
 #include "vec/runtime/vdata_stream_mgr.h"
@@ -172,6 +174,8 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
                                               const PExecPlanFragmentRequest* request,
                                               PExecPlanFragmentResult* response,
                                               google::protobuf::Closure* done) {
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base);
+    auto scope = OpentelemetryScope {span};
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
@@ -196,6 +200,8 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
                                                     const PExecPlanFragmentStartRequest* request,
                                                     PExecPlanFragmentResult* result,
                                                     google::protobuf::Closure* done) {
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+    auto scope = OpentelemetryScope {span};
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto st = _exec_env->fragment_mgr()->start_query_execution(request);
@@ -368,6 +374,8 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
                                                 const PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* result,
                                                 google::protobuf::Closure* done) {
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base);
+    auto scope = OpentelemetryScope {span};
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId tid;
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 2389bd6ec6..0178939d8c 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -107,6 +107,8 @@ set(UTIL_FILES
   hdfs_util.cpp
   topn_counter.cpp
   tuple_row_zorder_compare.cpp
+  telemetry/telemetry.cpp
+  telemetry/brpc_carrier.cpp
   quantile_state.cpp
   jni-util.cpp
 )
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index bc3baa3509..4babd77269 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -398,7 +398,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TU
 
     DCHECK(parent_counter_name == ROOT_COUNTER ||
            _counter_map.find(parent_counter_name) != _counter_map.end());
-    Counter* counter = _pool->add(new Counter(type, 0));
+    Counter* counter = _pool->add(new Counter(type, 0, name));
     _counter_map[name] = counter;
     std::set<std::string>* child_counters =
             find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>());
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 5c90960196..6a84def821 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -71,7 +71,8 @@ class RuntimeProfile {
 public:
     class Counter {
     public:
-        Counter(TUnit::type type, int64_t value = 0) : _value(value), _type(type) {}
+        Counter(TUnit::type type, int64_t value = 0, std::string name = "")
+                : _value(value), _type(type), _name(std::move(name)) {}
         virtual ~Counter() = default;
 
         virtual void update(int64_t delta) { _value.fetch_add(delta, std::memory_order_relaxed); }
@@ -93,11 +94,16 @@ public:
 
         TUnit::type type() const { return _type; }
 
+        std::string name() const { return _name; }
+
+        void set_name(std::string name) { _name = std::move(name); }
+
     private:
         friend class RuntimeProfile;
 
         std::atomic<int64_t> _value;
         TUnit::type _type;
+        std::string _name;
     };
 
     class DerivedCounter;
diff --git a/be/src/vec/exec/vempty_set_node.cpp b/be/src/util/telemetry/brpc_carrier.cpp
similarity index 62%
copy from be/src/vec/exec/vempty_set_node.cpp
copy to be/src/util/telemetry/brpc_carrier.cpp
index 620ea0e960..5d49b40fff 100644
--- a/be/src/vec/exec/vempty_set_node.cpp
+++ b/be/src/util/telemetry/brpc_carrier.cpp
@@ -15,16 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/exec/vempty_set_node.h"
+#include "brpc_carrier.h"
 
-namespace doris {
-namespace vectorized {
-VEmptySetNode::VEmptySetNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-        : ExecNode(pool, tnode, descs) {}
-
-Status VEmptySetNode::get_next(RuntimeState* state, Block* block, bool* eos) {
-    *eos = true;
-    return Status::OK();
+opentelemetry::nostd::string_view doris::telemetry::RpcServerCarrier::Get(
+        opentelemetry::nostd::string_view key) const noexcept {
+    auto it = cntl_->http_request().GetHeader(key.data());
+    if (it != nullptr) {
+        return it->data();
+    }
+    return "";
 }
-} // namespace vectorized
-} // namespace doris
+
+void doris::telemetry::RpcServerCarrier::Set(opentelemetry::nostd::string_view key,
+                                             opentelemetry::nostd::string_view value) noexcept {}
diff --git a/be/src/util/telemetry/brpc_carrier.h b/be/src/util/telemetry/brpc_carrier.h
new file mode 100644
index 0000000000..60a7abf784
--- /dev/null
+++ b/be/src/util/telemetry/brpc_carrier.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <brpc/controller.h>
+
+#include "opentelemetry/context/propagation/global_propagator.h"
+#include "opentelemetry/context/propagation/text_map_propagator.h"
+#include "opentelemetry/context/runtime_context.h"
+#include "opentelemetry/trace/context.h"
+#include "opentelemetry/trace/span_metadata.h"
+#include "opentelemetry/trace/span_startoptions.h"
+#include "util/telemetry/telemetry.h"
+
+namespace doris::telemetry {
+
+class RpcServerCarrier : public opentelemetry::context::propagation::TextMapCarrier {
+public:
+    explicit RpcServerCarrier(const brpc::Controller* cntl) : cntl_(cntl) {}
+
+    RpcServerCarrier() = default;
+
+    opentelemetry::nostd::string_view Get(
+            opentelemetry::nostd::string_view key) const noexcept override;
+
+    void Set(opentelemetry::nostd::string_view key,
+             opentelemetry::nostd::string_view value) noexcept override;
+
+private:
+    const brpc::Controller* cntl_ {};
+};
+
+inline OpentelemetrySpan start_rpc_server_span(std::string span_name,
+                                               google::protobuf::RpcController* cntl_base) {
+    RpcServerCarrier carrier(static_cast<brpc::Controller*>(cntl_base));
+    auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
+    auto prop = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
+    auto parent_context = prop->Extract(carrier, current_ctx);
+
+    if (opentelemetry::trace::GetSpan(parent_context)->GetContext().IsValid()) {
+        opentelemetry::trace::StartSpanOptions options;
+        options.kind = opentelemetry::trace::SpanKind::kServer;
+        options.parent = parent_context;
+        return telemetry::get_tracer("tracer")->StartSpan(std::move(span_name), options);
+    } else {
+        return telemetry::get_noop_tracer()->StartSpan("");
+    }
+}
+} // namespace doris::telemetry
diff --git a/be/src/util/telemetry/telemetry.cpp b/be/src/util/telemetry/telemetry.cpp
new file mode 100644
index 0000000000..bd3d90e48a
--- /dev/null
+++ b/be/src/util/telemetry/telemetry.cpp
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "telemetry.h"
+
+#include "common/config.h"
+#include "opentelemetry/context/propagation/global_propagator.h"
+#include "opentelemetry/context/propagation/text_map_propagator.h"
+#include "opentelemetry/exporters/zipkin/zipkin_exporter.h"
+#include "opentelemetry/nostd/shared_ptr.h"
+#include "opentelemetry/sdk/trace/batch_span_processor.h"
+#include "opentelemetry/trace/noop.h"
+#include "opentelemetry/trace/propagation/http_trace_context.h"
+#include "opentelemetry/trace/provider.h"
+#include "service/backend_options.h"
+
+namespace trace = opentelemetry::trace;
+namespace nostd = opentelemetry::nostd;
+namespace trace_sdk = opentelemetry::sdk::trace;
+namespace zipkin = opentelemetry::exporter::zipkin;
+namespace resource = opentelemetry::sdk::resource;
+namespace propagation = opentelemetry::context::propagation;
+
+void doris::telemetry::initTracer() {
+    if (!doris::config::enable_tracing) {
+        return;
+    }
+
+    // ZipkinExporter converts span to zipkin's format and exports span to zipkin.
+    zipkin::ZipkinExporterOptions opts;
+    opts.endpoint = doris::config::trace_export_url;
+    auto exporter = std::unique_ptr<trace_sdk::SpanExporter>(new zipkin::ZipkinExporter(opts));
+
+    // BatchSpanProcessor exports span by batch.
+    trace_sdk::BatchSpanProcessorOptions batchOptions;
+    batchOptions.schedule_delay_millis =
+            std::chrono::milliseconds(doris::config::export_span_schedule_delay_millis);
+    batchOptions.max_queue_size = doris::config::max_span_queue_size;
+    batchOptions.max_export_batch_size = doris::config::max_span_export_batch_size;
+    auto processor = std::unique_ptr<trace_sdk::SpanProcessor>(
+            new trace_sdk::BatchSpanProcessor(std::move(exporter), batchOptions));
+
+    std::string service_name = "BACKEND:" + BackendOptions::get_localhost();
+    resource::ResourceAttributes attributes = {{"service.name", service_name}};
+    auto resource = resource::Resource::Create(attributes);
+
+    auto provider = nostd::shared_ptr<trace::TracerProvider>(
+            new trace_sdk::TracerProvider(std::move(processor), resource));
+    // Set the global trace provider
+    trace::Provider::SetTracerProvider(std::move(provider));
+
+    // Specifies the format for parsing trace and span messages propagated via http or gpc.
+    propagation::GlobalTextMapPropagator::SetGlobalPropagator(
+            nostd::shared_ptr<propagation::TextMapPropagator>(
+                    new opentelemetry::trace::propagation::HttpTraceContext()));
+}
diff --git a/be/src/util/telemetry/telemetry.h b/be/src/util/telemetry/telemetry.h
new file mode 100644
index 0000000000..622cd22272
--- /dev/null
+++ b/be/src/util/telemetry/telemetry.h
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "opentelemetry/context/context.h"
+#include "opentelemetry/sdk/trace/tracer_provider.h"
+#include "opentelemetry/trace/provider.h"
+#include "util/pretty_printer.h"
+#include "util/runtime_profile.h"
+
+/// A trace represents the execution process of a single request in the system, span represents a
+/// logical operation unit with start time and execution duration in the system, and multiple spans
+/// form a trace.
+namespace doris {
+
+using OpentelemetryTracer = opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer>;
+using OpentelemetrySpan = opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>;
+using OpentelemetryScope = opentelemetry::trace::Scope;
+
+/// Used to initialize get_next_span and add Scope.
+#define INIT_AND_SCOPE_GET_NEXT_SPAN(tracer, get_next_span, name) \
+    do {                                                          \
+        if (UNLIKELY(!get_next_span)) {                           \
+            get_next_span = tracer->StartSpan(name);              \
+        }                                                         \
+    } while (false);                                              \
+    OpentelemetryScope scope {get_next_span};
+
+#define INIT_AND_SCOPE_SEND_SPAN(tracer, send_span, name) \
+    INIT_AND_SCOPE_GET_NEXT_SPAN(tracer, send_span, name)
+
+/// Start a span with the specified tracer, name, and variable name, and create a Scope for this
+/// span.
+///
+/// span represents the execution time of an operation unit, the time of its construction is the
+/// start time, the time of its destructuring is the end time, you can also call the End method to
+/// let span terminate, like "span->End();"
+///
+/// We can add Event and Attribute to span. Event will record the timestamp and event content,
+/// and Attribute can record key-value pairs.
+/// For example:
+/// span->AddEvent("event content");
+/// span->SetAttribute("key", "value");
+///
+/// Scope will add the span to a thread-local stack during construction, and remove the span from
+/// the stack during destructuring. When starting a span, the top-of-stack span will be the parent
+/// span by default, and the top-of-stack span can be obtained via the
+/// opentelemetry::trace::Tracer::GetCurrentSpan() method.
+#define START_AND_SCOPE_SPAN(tracer, span, name) \
+    auto span = tracer->StartSpan(name);         \
+    OpentelemetryScope scope {span};
+
+namespace telemetry {
+
+void initTracer();
+
+/// Return NoopTracer, the span generated by NoopTracer is NoopSpan, and the method bodies of
+/// NoopTracer and NoopSpan are empty.
+inline OpentelemetryTracer& get_noop_tracer() {
+    static OpentelemetryTracer noop_tracer =
+            opentelemetry::nostd::shared_ptr<opentelemetry::trace::NoopTracer>(
+                    new opentelemetry::trace::NoopTracer);
+    return noop_tracer;
+}
+
+inline OpentelemetryTracer get_tracer(const std::string& tracer_name) {
+    return opentelemetry::trace::Provider::GetTracerProvider()->GetTracer(tracer_name);
+}
+
+/// Returns true if the active pan stack is not empty.
+inline bool is_current_span_valid() {
+    return opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid();
+}
+
+inline void set_span_attribute(OpentelemetrySpan& span, RuntimeProfile::Counter* const counter) {
+    span->SetAttribute(counter->name(), PrettyPrinter::print(counter->value(), counter->type()));
+}
+
+inline void set_current_span_attribute(RuntimeProfile::Counter* const counter) {
+    opentelemetry::trace::Tracer::GetCurrentSpan()->SetAttribute(
+            counter->name(), PrettyPrinter::print(counter->value(), counter->type()));
+}
+
+} // namespace telemetry
+} // namespace doris
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 9c04fc9046..7a6b04e63c 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -836,6 +836,7 @@ Status HashJoinNode::close(RuntimeState* state) {
         return Status::OK();
     }
 
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "ashJoinNode::close");
     VExpr::close(_build_expr_ctxs, state);
     VExpr::close(_probe_expr_ctxs, state);
     if (_vother_join_conjunct_ptr) {
@@ -852,6 +853,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
 }
 
 Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "HashJoinNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_TIMER(_probe_timer);
 
@@ -872,7 +874,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
 
         do {
             SCOPED_TIMER(_probe_next_timer);
-            RETURN_IF_ERROR(child(0)->get_next(state, &_probe_block, &_probe_eos));
+            RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &_probe_block, &_probe_eos),
+                                           child(0)->get_next_span(), _probe_eos);
         } while (_probe_block.rows() == 0 && !_probe_eos);
 
         probe_rows = _probe_block.rows();
@@ -983,6 +986,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
 }
 
 Status HashJoinNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -995,8 +999,11 @@ Status HashJoinNode::open(RuntimeState* state) {
     }
 
     std::promise<Status> thread_status;
-    std::thread(bind(&HashJoinNode::_hash_table_build_thread, this, state, &thread_status))
-            .detach();
+    std::thread([this, state, thread_status_p = &thread_status,
+                 parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
+        OpentelemetryScope scope {parent_span};
+        this->_hash_table_build_thread(state, thread_status_p);
+    }).detach();
 
     // Open the probe-side child so that it may perform any initialisation in parallel.
     // Don't exit even if we see an error, we still need to wait for the build thread
@@ -1010,6 +1017,7 @@ Status HashJoinNode::open(RuntimeState* state) {
 }
 
 void HashJoinNode::_hash_table_build_thread(RuntimeState* state, std::promise<Status>* status) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::_hash_table_build_thread");
     SCOPED_ATTACH_TASK_THREAD(state, mem_tracker());
     status->set_value(_hash_table_build(state));
 }
@@ -1032,7 +1040,8 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) {
         block.clear_column_data();
         RETURN_IF_CANCELLED(state);
 
-        RETURN_IF_ERROR(child(1)->get_next(state, &block, &eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos),
+                                       child(1)->get_next_span(), eos);
         _hash_table_mem_tracker->consume(block.allocated_bytes());
         _mem_used += block.allocated_bytes();
 
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 8725297e75..c1de52de25 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -332,6 +332,7 @@ Status AggregationNode::prepare(RuntimeState* state) {
 }
 
 Status AggregationNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute open.");
@@ -358,7 +359,8 @@ Status AggregationNode::open(RuntimeState* state) {
     while (!eos) {
         RETURN_IF_CANCELLED(state);
         release_block_memory(block);
-        RETURN_IF_ERROR(_children[0]->get_next(state, &block, &eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block, &eos),
+                                       _children[0]->get_next_span(), eos);
         if (block.rows() == 0) {
             continue;
         }
@@ -374,6 +376,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
 }
 
 Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "AggregationNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute get_next.");
@@ -384,7 +387,9 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
         RETURN_IF_CANCELLED(state);
         do {
             release_block_memory(_preagg_block);
-            RETURN_IF_ERROR(_children[0]->get_next(state, &_preagg_block, &child_eos));
+            RETURN_IF_ERROR_AND_CHECK_SPAN(
+                    _children[0]->get_next(state, &_preagg_block, &child_eos),
+                    _children[0]->get_next_span(), child_eos);
         } while (_preagg_block.rows() == 0 && !child_eos);
 
         if (_preagg_block.rows() != 0) {
@@ -410,6 +415,7 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
 
 Status AggregationNode::close(RuntimeState* state) {
     if (is_closed()) return Status::OK();
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::close");
 
     for (auto* aggregate_evaluator : _aggregate_evaluators) aggregate_evaluator->close(state);
     VExpr::close(_probe_expr_ctxs, state);
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 23e1274dd4..7a4a3dc684 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -213,6 +213,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
 }
 
 Status VAnalyticEvalNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -233,6 +234,7 @@ Status VAnalyticEvalNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::close");
 
     VExpr::close(_partition_by_eq_expr_ctxs, state);
     VExpr::close(_order_by_eq_expr_ctxs, state);
@@ -248,6 +250,8 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
 }
 
 Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
+                                 "VAnalyticEvalNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
@@ -466,7 +470,8 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {
     Block block;
     RETURN_IF_CANCELLED(state);
     do {
-        RETURN_IF_ERROR(_children[0]->get_next(state, &block, &_input_eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block, &_input_eos),
+                                       _children[0]->get_next_span(), _input_eos);
     } while (!_input_eos && block.rows() == 0);
 
     if (_input_eos && block.rows() == 0) {
diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp b/be/src/vec/exec/vassert_num_rows_node.cpp
index 7a246df17e..28e1389bb7 100644
--- a/be/src/vec/exec/vassert_num_rows_node.cpp
+++ b/be/src/vec/exec/vassert_num_rows_node.cpp
@@ -39,6 +39,7 @@ VAssertNumRowsNode::VAssertNumRowsNode(ObjectPool* pool, const TPlanNode& tnode,
 }
 
 Status VAssertNumRowsNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAssertNumRowsNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
     // ISSUE-3435
@@ -47,8 +48,11 @@ Status VAssertNumRowsNode::open(RuntimeState* state) {
 }
 
 Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
+                                 "VAssertNumRowsNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(child(0)->get_next(state, block, eos));
+    RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, block, eos), child(0)->get_next_span(),
+                                   *eos);
     _num_rows_returned += block->rows();
     bool assert_res = false;
     switch (_assertion) {
diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp
index 05140888ae..8600c333ca 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -65,6 +65,7 @@ Status VBlockingJoinNode::prepare(RuntimeState* state) {
 
 Status VBlockingJoinNode::close(RuntimeState* state) {
     if (is_closed()) return Status::OK();
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBlockingJoinNode::close")
     ExecNode::close(state);
     return Status::OK();
 }
@@ -78,6 +79,7 @@ void VBlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Stat
 }
 
 Status VBlockingJoinNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBlockingJoinNode::open")
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -113,7 +115,8 @@ Status VBlockingJoinNode::open(RuntimeState* state) {
     // Seed left child in preparation for get_next().
     while (true) {
         release_block_memory(_left_block);
-        RETURN_IF_ERROR(child(0)->get_next(state, &_left_block, &_left_side_eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &_left_block, &_left_side_eos),
+                                       child(0)->get_next_span(), _left_side_eos);
         COUNTER_UPDATE(_left_child_row_counter, _left_block.rows());
         _left_block_pos = 0;
 
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index dc738c5a09..b765df56b3 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -82,6 +82,7 @@ Status VBrokerScanNode::prepare(RuntimeState* state) {
 }
 
 Status VBrokerScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -97,11 +98,16 @@ Status VBrokerScanNode::start_scanners() {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
         _num_running_scanners = 1;
     }
-    _scanner_threads.emplace_back(&VBrokerScanNode::scanner_worker, this, 0, _scan_ranges.size());
+    _scanner_threads.emplace_back([this, size = 0, length = _scan_ranges.size(),
+                                   parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
+        OpentelemetryScope scope {parent_span};
+        this->scanner_worker(size, length);
+    });
     return Status::OK();
 }
 
 Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VBrokerScanNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     // check if CANCELLED.
     if (state->is_cancelled()) {
@@ -195,6 +201,7 @@ Status VBrokerScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::close");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     _scan_finished.store(true);
     _queue_writer_cond.notify_all();
@@ -264,6 +271,7 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
 }
 
 void VBrokerScanNode::scanner_worker(int start_idx, int length) {
+    START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker");
     Thread::set_self_name("vbroker_scanner");
     Status status = Status::OK();
     ScannerCounter counter;
diff --git a/be/src/vec/exec/vcross_join_node.cpp b/be/src/vec/exec/vcross_join_node.cpp
index 27740d9f8e..8654160513 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -47,6 +47,7 @@ Status VCrossJoinNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VCrossJoinNode::close");
     _block_mem_tracker->release(_total_mem_usage);
     VBlockingJoinNode::close(state);
     return Status::OK();
@@ -64,7 +65,8 @@ Status VCrossJoinNode::construct_build_side(RuntimeState* state) {
         RETURN_IF_CANCELLED(state);
 
         Block block;
-        RETURN_IF_ERROR(child(1)->get_next(state, &block, &eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos),
+                                       child(1)->get_next_span(), eos);
         auto rows = block.rows();
         auto mem_usage = block.allocated_bytes();
 
@@ -91,6 +93,7 @@ void VCrossJoinNode::init_get_next(int left_batch_row) {
 }
 
 Status VCrossJoinNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VCrossJoinNode::get_next");
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
@@ -119,7 +122,9 @@ Status VCrossJoinNode::get_next(RuntimeState* state, Block* block, bool* eos) {
                     do {
                         release_block_memory(_left_block);
                         timer.stop();
-                        RETURN_IF_ERROR(child(0)->get_next(state, &_left_block, &_left_side_eos));
+                        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                                child(0)->get_next(state, &_left_block, &_left_side_eos),
+                                child(0)->get_next_span(), _left_side_eos);
                         timer.start();
                     } while (_left_block.rows() == 0 && !_left_side_eos);
                     COUNTER_UPDATE(_left_child_row_counter, _left_block.rows());
diff --git a/be/src/vec/exec/vempty_set_node.cpp b/be/src/vec/exec/vempty_set_node.cpp
index 620ea0e960..fb4970f5bb 100644
--- a/be/src/vec/exec/vempty_set_node.cpp
+++ b/be/src/vec/exec/vempty_set_node.cpp
@@ -23,6 +23,7 @@ VEmptySetNode::VEmptySetNode(ObjectPool* pool, const TPlanNode& tnode, const Des
         : ExecNode(pool, tnode, descs) {}
 
 Status VEmptySetNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VEmptySetNode::get_next");
     *eos = true;
     return Status::OK();
 }
diff --git a/be/src/vec/exec/ves_http_scan_node.cpp b/be/src/vec/exec/ves_http_scan_node.cpp
index b81511408a..34dc7d19bd 100644
--- a/be/src/vec/exec/ves_http_scan_node.cpp
+++ b/be/src/vec/exec/ves_http_scan_node.cpp
@@ -118,6 +118,7 @@ Status VEsHttpScanNode::build_conjuncts_list() {
 }
 
 Status VEsHttpScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VEsHttpScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -191,13 +192,18 @@ Status VEsHttpScanNode::start_scanners() {
 
     _scanners_status.resize(_scan_ranges.size());
     for (int i = 0; i < _scan_ranges.size(); i++) {
-        _scanner_threads.emplace_back(&VEsHttpScanNode::scanner_worker, this, i,
-                                      _scan_ranges.size(), std::ref(_scanners_status[i]));
+        _scanner_threads.emplace_back(
+                [this, i, length = _scan_ranges.size(), &p_status = _scanners_status[i],
+                 parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
+                    OpentelemetryScope scope {parent_span};
+                    this->scanner_worker(i, length, p_status);
+                });
     }
     return Status::OK();
 }
 
 Status VEsHttpScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VEsHttpScanNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_block_queue_lock);
@@ -377,6 +383,7 @@ void VEsHttpScanNode::debug_string(int ident_level, std::stringstream* out) cons
 }
 
 void VEsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
+    START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VEsHttpScanNode::scanner_worker");
     SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp
index d6c133af83..1a97359921 100644
--- a/be/src/vec/exec/vexcept_node.cpp
+++ b/be/src/vec/exec/vexcept_node.cpp
@@ -42,6 +42,7 @@ Status VExceptNode::prepare(RuntimeState* state) {
 }
 
 Status VExceptNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::open");
     RETURN_IF_ERROR(VSetOperationNode::open(state));
     bool eos = false;
     Status st;
@@ -78,6 +79,7 @@ Status VExceptNode::open(RuntimeState* state) {
 }
 
 Status VExceptNode::get_next(RuntimeState* state, Block* output_block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExceptNode::get_next");
     SCOPED_TIMER(_probe_timer);
     Status st;
     create_mutable_cols(output_block);
@@ -104,6 +106,7 @@ Status VExceptNode::get_next(RuntimeState* state, Block* output_block, bool* eos
 }
 
 Status VExceptNode::close(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::close");
     return VSetOperationNode::close(state);
 }
 
diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index 004999ed4b..33909fb9ff 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -64,6 +64,7 @@ Status VExchangeNode::prepare(RuntimeState* state) {
     return Status::OK();
 }
 Status VExchangeNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
@@ -83,6 +84,7 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
 }
 
 Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next");
     SCOPED_TIMER(runtime_profile()->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     auto status = _stream_recvr->get_next(block, eos);
@@ -103,6 +105,7 @@ Status VExchangeNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::close");
 
     if (_stream_recvr != nullptr) {
         _stream_recvr->close();
diff --git a/be/src/vec/exec/vintersect_node.cpp b/be/src/vec/exec/vintersect_node.cpp
index 7b8263ca9e..f8f083ced1 100644
--- a/be/src/vec/exec/vintersect_node.cpp
+++ b/be/src/vec/exec/vintersect_node.cpp
@@ -42,6 +42,7 @@ Status VIntersectNode::prepare(RuntimeState* state) {
 }
 
 Status VIntersectNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::open");
     RETURN_IF_ERROR(VSetOperationNode::open(state));
     bool eos = false;
     Status st;
@@ -79,6 +80,7 @@ Status VIntersectNode::open(RuntimeState* state) {
 }
 
 Status VIntersectNode::get_next(RuntimeState* state, Block* output_block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VIntersectNode::get_next");
     SCOPED_TIMER(_probe_timer);
     create_mutable_cols(output_block);
     Status st;
@@ -106,6 +108,7 @@ Status VIntersectNode::get_next(RuntimeState* state, Block* output_block, bool*
 }
 
 Status VIntersectNode::close(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::close");
     return VSetOperationNode::close(state);
 }
 } // namespace vectorized
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp
index 954a28bd72..595a195fe5 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -98,6 +98,7 @@ Status VMysqlScanNode::prepare(RuntimeState* state) {
 }
 
 Status VMysqlScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -145,6 +146,7 @@ Status VMysqlScanNode::write_text_slot(char* value, int value_length, SlotDescri
 }
 
 Status VMysqlScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VMysqlScanNode::get_next");
     VLOG_CRITICAL << "VMysqlScanNode::GetNext";
     if (state == NULL || block == NULL || eos == NULL)
         return Status::InternalError("input is NULL pointer");
@@ -236,6 +238,7 @@ Status VMysqlScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::close");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     _tuple_pool.reset();
diff --git a/be/src/vec/exec/vodbc_scan_node.cpp b/be/src/vec/exec/vodbc_scan_node.cpp
index f536df22bd..a292fad2a1 100644
--- a/be/src/vec/exec/vodbc_scan_node.cpp
+++ b/be/src/vec/exec/vodbc_scan_node.cpp
@@ -85,6 +85,7 @@ Status VOdbcScanNode::prepare(RuntimeState* state) {
 }
 
 Status VOdbcScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOdbcScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -120,6 +121,7 @@ Status VOdbcScanNode::write_text_slot(char* value, int value_length, SlotDescrip
 }
 
 Status VOdbcScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VOdbcScanNode::get_next");
     VLOG_CRITICAL << get_scan_node_type() << "::GetNext";
 
     if (nullptr == state || nullptr == block || nullptr == eos) {
@@ -220,6 +222,7 @@ Status VOdbcScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOdbcScanNode::close");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     _tuple_pool.reset();
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 1fdba643ae..fc3cb0f59f 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -234,6 +234,7 @@ Status VOlapScanNode::prepare(RuntimeState* state) {
 }
 
 Status VOlapScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::open");
     VLOG_CRITICAL << "VOlapScanNode::Open";
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
@@ -279,6 +280,7 @@ Status VOlapScanNode::open(RuntimeState* state) {
 
 void VOlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::transfer_thread");
     SCOPED_ATTACH_TASK_THREAD(state, mem_tracker());
     Status status = Status::OK();
 
@@ -371,6 +373,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
             _add_blocks(blocks);
         }
     }
+    telemetry::set_span_attribute(span, _scanner_sched_counter);
 
     VLOG_CRITICAL << "TransferThread finish.";
     _transfer_done = true;
@@ -383,6 +386,8 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
 }
 
 void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
+    START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span,
+                         "VOlapScanNode::scanner_thread");
     SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
     ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker());
     Thread::set_self_name("volap_scanner");
@@ -1429,6 +1434,7 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
         _transfer_done = true;
         return Status::OK();
     }
+    auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
     _block_mem_tracker = MemTracker::create_virtual_tracker(-1, "VOlapScanNode:Block");
 
     // ranges constructed from scan keys
@@ -1504,13 +1510,19 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
     }
     COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
     COUNTER_SET(_num_scanners, static_cast<int64_t>(_volap_scanners.size()));
+    telemetry::set_span_attribute(span, _num_disks_accessed_counter);
+    telemetry::set_span_attribute(span, _num_scanners);
 
     // init progress
     std::stringstream ss;
     ss << "ScanThread complete (node=" << id() << "):";
     _progress = ProgressUpdater(ss.str(), _volap_scanners.size(), 1);
 
-    _transfer_thread.reset(new std::thread(&VOlapScanNode::transfer_thread, this, state));
+    _transfer_thread.reset(new std::thread(
+            [this, state, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
+                opentelemetry::trace::Scope scope {parent_span};
+                transfer_thread(state);
+            }));
 
     return Status::OK();
 }
@@ -1519,6 +1531,7 @@ Status VOlapScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::close");
     // change done status
     {
         std::unique_lock<std::mutex> l(_blocks_lock);
@@ -1567,6 +1580,7 @@ Status VOlapScanNode::close(RuntimeState* state) {
 }
 
 Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VOlapScanNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
@@ -1739,10 +1753,14 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
 
     // post volap scanners to thread-pool
     PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
+    auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
     auto iter = olap_scanners.begin();
     while (iter != olap_scanners.end()) {
         PriorityThreadPool::Task task;
-        task.work_function = std::bind(&VOlapScanNode::scanner_thread, this, *iter);
+        task.work_function = [this, scanner = *iter, parent_span = cur_span] {
+            opentelemetry::trace::Scope scope {parent_span};
+            this->scanner_thread(scanner);
+        };
         task.priority = _nice;
         task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
         (*iter)->start_wait_worker_timer();
@@ -1777,6 +1795,7 @@ Status VOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_
         _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range));
         COUNTER_UPDATE(_tablet_counter, 1);
     }
+    telemetry::set_current_span_attribute(_tablet_counter);
 
     return Status::OK();
 }
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 1595aef71c..4ccbd60750 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -69,6 +69,7 @@ Status VRepeatNode::prepare(RuntimeState* state) {
 }
 
 Status VRepeatNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::open");
     VLOG_CRITICAL << "VRepeatNode::open";
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(RepeatNode::open(state));
@@ -173,6 +174,7 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl
 }
 
 Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VRepeatNode::get_next");
     VLOG_CRITICAL << "VRepeatNode::get_next";
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
@@ -189,7 +191,9 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     // current child block has finished its repeat, get child's next block
     if (_child_block->rows() == 0) {
         while (_child_block->rows() == 0 && !_child_eos) {
-            RETURN_IF_ERROR(child(0)->get_next(state, _child_block.get(), &_child_eos));
+            RETURN_IF_ERROR_AND_CHECK_SPAN(
+                    child(0)->get_next(state, _child_block.get(), &_child_eos),
+                    child(0)->get_next_span(), _child_eos);
         }
 
         if (_child_eos and _child_block->rows() == 0) {
@@ -219,6 +223,7 @@ Status VRepeatNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VRepeatNode::close");
     release_block_memory(*_child_block.get());
     RETURN_IF_ERROR(child(0)->close(state));
     return ExecNode::close(state);
diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp
index f2e29eb7e9..65f260adec 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -95,11 +95,14 @@ Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
 }
 
 Status VSchemaScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::close");
     if (!_is_init) {
+        span->SetStatus(opentelemetry::trace::StatusCode::kError, "Open before Init.");
         return Status::InternalError("Open before Init.");
     }
 
     if (nullptr == state) {
+        span->SetStatus(opentelemetry::trace::StatusCode::kError, "input pointer is nullptr.");
         return Status::InternalError("input pointer is nullptr.");
     }
 
@@ -231,6 +234,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
 }
 
 Status VSchemaScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSchemaScanNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     VLOG_CRITICAL << "VSchemaScanNode::GetNext";
@@ -456,6 +460,7 @@ Status VSchemaScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSchemaScanNode::close");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     _tuple_pool.reset();
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index b81b19c1ad..c69f997150 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -32,6 +32,7 @@ Status VSelectNode::prepare(RuntimeState* state) {
 }
 
 Status VSelectNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSelectNode::open");
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(child(0)->open(state));
     return Status::OK();
@@ -42,11 +43,13 @@ Status VSelectNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos
 }
 
 Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSelectNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
     do {
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR(_children[0]->get_next(state, block, &_child_eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, block, &_child_eos),
+                                       _children[0]->get_next_span(), _child_eos);
         if (_child_eos) {
             *eos = true;
             break;
@@ -63,6 +66,7 @@ Status VSelectNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSelectNode::close");
     return ExecNode::close(state);
 }
 
diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp
index e3a2659b73..9dc620896e 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -82,6 +82,7 @@ Status VSetOperationNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::close");
     for (auto& exprs : _child_expr_lists) {
         VExpr::close(exprs, state);
     }
@@ -113,6 +114,7 @@ Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
 }
 
 Status VSetOperationNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -243,7 +245,8 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) {
         block.clear_column_data();
         SCOPED_TIMER(_build_timer);
         RETURN_IF_CANCELLED(state);
-        RETURN_IF_ERROR(child(0)->get_next(state, &block, &eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos),
+                                       child(0)->get_next_span(), eos);
 
         size_t allocated_bytes = block.allocated_bytes();
         _hash_table_mem_tracker->consume(allocated_bytes);
@@ -310,7 +313,8 @@ Status VSetOperationNode::process_probe_block(RuntimeState* state, int child_id,
     _probe_rows = 0;
 
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(child(child_id)->get_next(state, &_probe_block, eos));
+    RETURN_IF_ERROR_AND_CHECK_SPAN(child(child_id)->get_next(state, &_probe_block, eos),
+                                   child(child_id)->get_next_span(), *eos);
     _probe_rows = _probe_block.rows();
     RETURN_IF_ERROR(extract_probe_column(_probe_block, _probe_columns, child_id));
     return Status::OK();
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 0a3aa18b2f..56b93bc675 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -50,6 +50,7 @@ Status VSortNode::prepare(RuntimeState* state) {
 }
 
 Status VSortNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -76,6 +77,7 @@ Status VSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
 }
 
 Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSortNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
 
@@ -105,6 +107,7 @@ Status VSortNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::close");
     _block_mem_tracker->release(_total_mem_usage);
     _vsort_exec_exprs.close(state);
     return ExecNode::close(state);
@@ -125,7 +128,8 @@ Status VSortNode::sort_input(RuntimeState* state) {
     bool eos = false;
     do {
         Block block;
-        RETURN_IF_ERROR(child(0)->get_next(state, &block, &eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos),
+                                       child(0)->get_next_span(), eos);
         auto rows = block.rows();
 
         if (rows != 0) {
diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp
index d21b252f35..0ac4795b3d 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -79,6 +79,8 @@ Status VTableFunctionNode::prepare(RuntimeState* state) {
 }
 
 Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
+                                 "VTableFunctionNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     RETURN_IF_CANCELLED(state);
@@ -114,7 +116,9 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
         // if child_block is empty, get data from child.
         if (_child_block->rows() == 0) {
             while (_child_block->rows() == 0 && !_child_eos) {
-                RETURN_IF_ERROR(child(0)->get_next(state, _child_block.get(), &_child_eos));
+                RETURN_IF_ERROR_AND_CHECK_SPAN(
+                        child(0)->get_next(state, _child_block.get(), &_child_eos),
+                        child(0)->get_next_span(), _child_eos);
             }
             if (_child_eos && _child_block->rows() == 0) {
                 *eos = true;
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index b713d4f1a1..4123b2d9cc 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -79,6 +79,7 @@ Status VUnionNode::prepare(RuntimeState* state) {
 }
 
 Status VUnionNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VUnionNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
     // open const expr lists.
@@ -107,7 +108,8 @@ Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) {
         _child_eos = false;
     }
     DCHECK_EQ(block->rows(), 0);
-    RETURN_IF_ERROR(child(_child_idx)->get_next(state, block, &_child_eos));
+    RETURN_IF_ERROR_AND_CHECK_SPAN(child(_child_idx)->get_next(state, block, &_child_eos),
+                                   child(_child_idx)->get_next_span(), _child_eos);
     if (_child_eos) {
         // Even though the child is at eos, it's not OK to close() it here. Once we close
         // the child, the row batches that it produced are invalid. Marking the batch as
@@ -147,7 +149,9 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
         // Here need materialize block of child block, so here so not mem_reuse
         child_block.clear();
         // The first batch from each child is always fetched here.
-        RETURN_IF_ERROR(child(_child_idx)->get_next(state, &child_block, &_child_eos));
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                child(_child_idx)->get_next(state, &child_block, &_child_eos),
+                child(_child_idx)->get_next_span(), _child_eos);
         SCOPED_TIMER(_materialize_exprs_evaluate_timer);
         if (child_block.rows() > 0) {
             mblock.merge(materialize_block(&child_block));
@@ -213,6 +217,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
 }
 
 Status VUnionNode::get_next(RuntimeState* state, Block* block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VUnionNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
     // RETURN_IF_ERROR(QueryMaintenance(state));
@@ -247,6 +252,7 @@ Status VUnionNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VUnionNode::close");
     for (auto& exprs : _const_expr_lists) {
         VExpr::close(exprs, state);
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index e7d84c1b48..f759664895 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -432,6 +432,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
 }
 
 Status VDataStreamSender::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VDataStreamSender::open");
     DCHECK(state != nullptr);
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
@@ -446,6 +447,7 @@ Status VDataStreamSender::send(RuntimeState* state, RowBatch* batch) {
 }
 
 Status VDataStreamSender::send(RuntimeState* state, Block* block) {
+    INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VDataStreamSender::send")
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
     if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
@@ -560,6 +562,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) {
 Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
     if (_closed) return Status::OK();
 
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VDataStreamSender::close");
     Status final_st = Status::OK();
     for (int i = 0; i < _channels.size(); ++i) {
         Status st = _channels[i]->close(state);
diff --git a/be/src/vec/sink/vmysql_table_sink.cpp b/be/src/vec/sink/vmysql_table_sink.cpp
index cf112bb815..863c127120 100644
--- a/be/src/vec/sink/vmysql_table_sink.cpp
+++ b/be/src/vec/sink/vmysql_table_sink.cpp
@@ -67,6 +67,7 @@ Status VMysqlTableSink::prepare(RuntimeState* state) {
 }
 
 Status VMysqlTableSink::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlTableSink::open");
     // Prepare the exprs to run.
     RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state));
     // create writer
@@ -81,10 +82,12 @@ Status VMysqlTableSink::send(RuntimeState* state, RowBatch* batch) {
 }
 
 Status VMysqlTableSink::send(RuntimeState* state, Block* block) {
+    INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VMysqlTableSink::send");
     return _writer->append(block);
 }
 
 Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlTableSink::close");
     VExpr::close(_output_expr_ctxs, state);
     return Status::OK();
 }
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
index bc1b987d45..4ee8ab0293 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -135,6 +135,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
 }
 
 Status VResultFileSink::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VResultFileSink::open");
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
@@ -143,6 +144,7 @@ Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) {
 }
 
 Status VResultFileSink::send(RuntimeState* state, Block* block) {
+    INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VResultFileSink::send");
     RETURN_IF_ERROR(_writer->append_block(*block));
     return Status::OK();
 }
@@ -152,6 +154,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
         return Status::OK();
     }
 
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VResultFileSink::close");
     Status final_status = exec_status;
     // close the writer
     if (_writer) {
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index a428eda66c..e8d6572509 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -79,6 +79,7 @@ Status VResultSink::prepare(RuntimeState* state) {
 }
 
 Status VResultSink::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VResultSink::open");
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
@@ -87,6 +88,7 @@ Status VResultSink::send(RuntimeState* state, RowBatch* batch) {
 }
 
 Status VResultSink::send(RuntimeState* state, Block* block) {
+    INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VResultSink::send");
     // The memory consumption in the process of sending the results is not check query memory limit.
     // Avoid the query being cancelled when the memory limit is reached after the query result comes out.
     STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
@@ -98,6 +100,7 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) {
         return Status::OK();
     }
 
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VResultSink::close");
     Status final_status = exec_status;
 
     if (_writer) {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 99fc7213ac..a7a20fb36f 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -369,6 +369,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
 }
 
 Status VOlapTableSink::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapTableSink::open");
     // Prepare the exprs to run.
     RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
     return OlapTableSink::open(state);
@@ -382,6 +383,7 @@ size_t VOlapTableSink::get_pending_bytes() const {
     return mem_consumption;
 }
 Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) {
+    INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VOlapTableSink::send");
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     Status status = Status::OK();
 
@@ -490,6 +492,7 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
 
 Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
     if (_closed) return _close_status;
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapTableSink::close");
     vectorized::VExpr::close(_output_vexpr_ctxs, state);
     return OlapTableSink::close(state, exec_status);
 }
diff --git a/docs/.vuepress/sidebar/en/docs.js b/docs/.vuepress/sidebar/en/docs.js
index 96613579e1..8901be9be2 100644
--- a/docs/.vuepress/sidebar/en/docs.js
+++ b/docs/.vuepress/sidebar/en/docs.js
@@ -867,6 +867,7 @@ module.exports = [
       },
       "sql-interception",
       "query-profile",
+      "tracing",
       "optimization",
       {
         title: "Maintenance and Monitor",
diff --git a/docs/.vuepress/sidebar/zh-CN/docs.js b/docs/.vuepress/sidebar/zh-CN/docs.js
index 8b44b4565b..428e9e65bd 100644
--- a/docs/.vuepress/sidebar/zh-CN/docs.js
+++ b/docs/.vuepress/sidebar/zh-CN/docs.js
@@ -867,6 +867,7 @@ module.exports = [
       },
       "sql-interception",
       "query-profile",
+      "tracing",
       "optimization",
       {
         title: "运维监控",
diff --git a/docs/en/docs/admin-manual/tracing.md b/docs/en/docs/admin-manual/tracing.md
new file mode 100644
index 0000000000..b4cdf048a6
--- /dev/null
+++ b/docs/en/docs/admin-manual/tracing.md
@@ -0,0 +1,80 @@
+---
+{
+    "title": "tracing",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# tracing
+
+Tracing records the life cycle of a request execution in the system, including the request and its sub-procedure call links, execution time and statistics, which can be used for slow query location, performance bottleneck analysis, etc.
+
+# Operation process
+
+## Deploy distributed tracing system
+
+Currently supports [Zipkin](https://zipkin.io/) ,More ldistributed tracing systems will be supported in the future.
+
+```
+curl -sSL https://zipkin.io/quickstart.sh | bash -s
+java -jar zipkin.jar
+```
+
+## Configuring and starting Doris
+
+### Add configuration to fe.conf
+
+```
+enable_tracing = true
+
+# Configure traces to export to zipkin
+trace_export_url = http://127.0.0.1:9411/api/v2/spans
+```
+
+### Add configuration to be.conf
+```
+enable_tracing = true
+
+# Configure traces to export to zipkin
+trace_export_url = http://127.0.0.1:9411/api/v2/spans
+
+# Queue size for caching spans. span export will be triggered once when the number of spans reaches half of the queue capacity. spans arriving in the queue will be discarded when the queue is full.
+max_span_queue_size=2048
+
+# The maximum number of spans to export in a single pass.
+max_span_export_batch_size=512
+
+# Maximum interval for exporting span
+export_span_schedule_delay_millis=500
+```
+
+### Start fe and be
+```
+sh fe/bin/start_fe.sh --daemon
+sh be/bin/start_be.sh --daemon
+```
+
+### Executing a query
+
+### View zipkin UI
+
+The browser opens `http://127.0.0.1:9411/zipkin/` to view the query tracing.
diff --git a/docs/zh-CN/docs/admin-manual/tracing.md b/docs/zh-CN/docs/admin-manual/tracing.md
new file mode 100644
index 0000000000..35940589c4
--- /dev/null
+++ b/docs/zh-CN/docs/admin-manual/tracing.md
@@ -0,0 +1,82 @@
+---
+{
+    "title": "链路追踪",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 链路追踪
+
+链路追踪(tracing)记录了一次请求在系统中的执行的生命周期,包括请求及其子过程调用链路、执行时间及统计信息,可用于慢查询定位、性能瓶颈分析等。
+
+# 操作流程
+
+## 部署链路分析系统
+
+目前支持 [Zipkin](https://zipkin.io/) ,未来会支持更多链路分析系统。
+
+```
+curl -sSL https://zipkin.io/quickstart.sh | bash -s
+java -jar zipkin.jar
+```
+
+## 配置及启动Doris
+
+### 添加配置到fe.conf
+
+```
+# 开启链路追踪
+enable_tracing = true
+
+# 配置traces导出到zipkin
+trace_export_url = http://127.0.0.1:9411/api/v2/spans
+```
+
+### 添加配置到be.conf
+```
+# 开启链路追踪
+enable_tracing = true
+
+# 配置traces导出到zipkin
+trace_export_url = http://127.0.0.1:9411/api/v2/spans
+
+# 缓存span的队列大小。span数量达到队列容量一半时将触发一次span导出,队列满后到达队列的span将被丢弃。
+max_span_queue_size=2048
+
+# 单次导出span的最大数量。
+max_span_export_batch_size=512
+
+# 导出span的最大间隔时间
+export_span_schedule_delay_millis=500
+```
+
+### 启动fe和be
+```
+sh fe/bin/start_fe.sh --daemon
+sh be/bin/start_be.sh --daemon
+```
+
+## 执行查询
+
+## 查看zipkin UI
+
+浏览器打开`http://127.0.0.1:9411/zipkin/` 可查看查询链路。
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index db3cfa2e0b..d6dbe82bcd 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -600,6 +600,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-api -->
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-api</artifactId>
+            <version>1.14.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-sdk</artifactId>
+            <version>1.14.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-exporter-otlp-trace</artifactId>
+            <version>1.14.0</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-exporter-zipkin -->
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-exporter-zipkin</artifactId>
+            <version>1.14.0</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
index 8004ee15ab..a692789cdf 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.LdapConfig;
 import org.apache.doris.common.Log4jConfig;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.Version;
+import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.common.util.JdkUtils;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.httpv2.HttpServer;
@@ -128,6 +129,8 @@ public class PaloFe {
             Catalog.getCurrentCatalog().initialize(args);
             Catalog.getCurrentCatalog().waitForReady();
 
+            Telemetry.initOpenTelemetry();
+
             // init and start:
             // 1. HttpServer for HTTP Server
             // 2. FeServer for Thrift Server
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 305fb9a627..f411669a8c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1631,6 +1631,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int cbo_default_sample_percentage = 10;
 
+    @ConfField(mutable = false, masterOnly = false)
+    public static boolean enable_tracing = false;
+
+    @ConfField(mutable = false, masterOnly = false)
+    public static String trace_export_url = "http://127.0.0.1:9411/api/v2/spans";
+
     /**
      * If set to TRUE, the compaction slower replica will be skipped when select get queryable replicas
      * Default is true.
diff --git a/be/src/vec/exec/vempty_set_node.cpp b/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/ScopedSpan.java
similarity index 56%
copy from be/src/vec/exec/vempty_set_node.cpp
copy to fe/fe-core/src/main/java/org/apache/doris/common/telemetry/ScopedSpan.java
index 620ea0e960..1bb5584541 100644
--- a/be/src/vec/exec/vempty_set_node.cpp
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/ScopedSpan.java
@@ -15,16 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/exec/vempty_set_node.h"
+package org.apache.doris.common.telemetry;
 
-namespace doris {
-namespace vectorized {
-VEmptySetNode::VEmptySetNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-        : ExecNode(pool, tnode, descs) {}
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
 
-Status VEmptySetNode::get_next(RuntimeState* state, Block* block, bool* eos) {
-    *eos = true;
-    return Status::OK();
+/**
+ * encapsulated {@link Span} and {@link Scope}.
+ */
+public class ScopedSpan {
+    private Span span;
+    private Scope scope;
+
+    public ScopedSpan() {
+        span = Telemetry.getNoopSpan();
+        this.scope = span.makeCurrent();
+    }
+
+    public ScopedSpan(Span span) {
+        this.span = span;
+        this.scope = span.makeCurrent();
+    }
+
+    public Span getSpan() {
+        return span;
+    }
+
+    public void endSpan() {
+        scope.close();
+        span.end();
+    }
 }
-} // namespace vectorized
-} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/Telemetry.java b/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/Telemetry.java
new file mode 100644
index 0000000000..0f7cd44d15
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/telemetry/Telemetry.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.telemetry;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Config;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
+import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Managing OpenTelemetry sdk.
+ */
+public class Telemetry {
+    private static final Logger LOG = LogManager.getLogger(Telemetry.class);
+
+    private static OpenTelemetry openTelemetry = OpenTelemetry.noop();
+
+    /**
+     * Initialize {@link OpenTelemetry} with {@link SdkTracerProvider}, {@link BatchSpanProcessor},
+     * {@link ZipkinSpanExporter} and {@link W3CTraceContextPropagator}.
+     */
+    public static void initOpenTelemetry() {
+        if (!Config.enable_tracing) {
+            return;
+        }
+
+        // todo: It may be possible to use oltp exporter to export telemetry data to otel collector,
+        //  which in turn processes and sends telemetry data to multiple back-ends (e.g. zipkin, Prometheus,
+        //  Fluent Bit, etc.) to improve scalability.
+        String httpUrl = Config.trace_export_url;
+        SpanExporter spanExporter = zipkinExporter(httpUrl);
+
+        String serviceName = "FRONTEND:" + Catalog.getCurrentCatalog().getSelfNode().first;
+        Resource serviceNameResource =
+                Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName));
+        // Send a batch of spans if ScheduleDelay time or MaxExportBatchSize is reached
+        BatchSpanProcessor spanProcessor =
+                BatchSpanProcessor.builder(spanExporter).setScheduleDelay(100, TimeUnit.MILLISECONDS)
+                        .setMaxExportBatchSize(1000).build();
+
+        SdkTracerProvider tracerProvider = SdkTracerProvider.builder().addSpanProcessor(spanProcessor)
+                .setResource(Resource.getDefault().merge(serviceNameResource)).build();
+        openTelemetry = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider)
+                .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())).build();
+
+        // add a shutdown hook to shut down the SDK
+        Runtime.getRuntime().addShutdownHook(new Thread(tracerProvider::shutdown));
+    }
+
+    private static SpanExporter zipkinExporter(String httpUrl) {
+        return ZipkinSpanExporter.builder().setEndpoint(httpUrl).build();
+    }
+
+    private static SpanExporter oltpExporter(String httpUrl) {
+        return OtlpGrpcSpanExporter.builder().setEndpoint(httpUrl).build();
+    }
+
+    public static OpenTelemetry getOpenTelemetry() {
+        return openTelemetry;
+    }
+
+    public static Tracer getNoopTracer() {
+        return OpenTelemetry.noop().getTracer("noop");
+    }
+
+    public static Span getNoopSpan() {
+        return getNoopTracer().spanBuilder("noopSpan").startSpan();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 306ed5d87e..6ce06fefca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.DataSourceIf;
 import org.apache.doris.datasource.InternalDataSource;
@@ -40,6 +41,7 @@ import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import io.opentelemetry.api.trace.Tracer;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -108,6 +110,8 @@ public class ConnectContext {
     // Cache thread info for this connection.
     protected volatile ThreadInfo threadInfo;
 
+    protected volatile Tracer tracer = Telemetry.getNoopTracer();
+
     // Catalog: put catalog here is convenient for unit test,
     // because catalog is singleton, hard to mock
     protected Catalog catalog;
@@ -490,6 +494,14 @@ public class ConnectContext {
         this.sqlHash = sqlHash;
     }
 
+    public Tracer getTracer() {
+        return tracer;
+    }
+
+    public void initTracer(String name) {
+        this.tracer = Telemetry.getOpenTelemetry().getTracer(name);
+    }
+
     // kill operation with no protect.
     public void kill(boolean killConnection) {
         LOG.warn("kill timeout query, {}, kill connection: {}",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 4c0c871066..86b367e39e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -55,6 +55,8 @@ import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -352,7 +354,16 @@ public class ConnectProcessor {
                 handleQuit();
                 break;
             case COM_QUERY:
-                handleQuery();
+                ctx.initTracer("trace");
+                Span rootSpan = ctx.getTracer().spanBuilder("handleQuery").startSpan();
+                try (Scope scope = rootSpan.makeCurrent()) {
+                    handleQuery();
+                } catch (Exception e) {
+                    rootSpan.recordException(e);
+                    throw e;
+                } finally {
+                    rootSpan.end();
+                }
                 break;
             case COM_FIELD_LIST:
                 handleFieldList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e0ce3fe524..e9bf9ae943 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -28,6 +28,8 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.telemetry.ScopedSpan;
+import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ListUtil;
 import org.apache.doris.common.util.ProfileWriter;
@@ -102,6 +104,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -645,9 +651,16 @@ public class Coordinator {
             } // end for fragments
 
             // 4. send and wait fragments rpc
-            List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures
-                    = Lists.newArrayList();
+            List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures =
+                    Lists.newArrayList();
+            Context parentSpanContext = Context.current();
             for (BackendExecStates states : beToExecStates.values()) {
+                Span span = Telemetry.getNoopSpan();
+                if (ConnectContext.get() != null) {
+                    span = ConnectContext.get().getTracer().spanBuilder("execRemoteFragmentsAsync")
+                            .setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan();
+                }
+                states.scopedSpan = new ScopedSpan(span);
                 states.unsetFields();
                 futures.add(Pair.create(states, states.execRemoteFragmentsAsync()));
             }
@@ -657,6 +670,12 @@ public class Coordinator {
                 // 5. send and wait execution start rpc
                 futures.clear();
                 for (BackendExecStates states : beToExecStates.values()) {
+                    Span span = Telemetry.getNoopSpan();
+                    if (ConnectContext.get() != null) {
+                        span = ConnectContext.get().getTracer().spanBuilder("execPlanFragmentStartAsync")
+                                .setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan();
+                    }
+                    states.scopedSpan = new ScopedSpan(span);
                     futures.add(Pair.create(states, states.execPlanFragmentStartAsync()));
                 }
                 waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
@@ -679,6 +698,7 @@ public class Coordinator {
             TStatusCode code;
             String errMsg = null;
             Exception exception = null;
+            Span span = pair.first.scopedSpan.getSpan();
             try {
                 PExecPlanFragmentResult result = pair.second.get(timeoutMs, TimeUnit.MILLISECONDS);
                 code = TStatusCode.findByValue(result.getStatus().getStatusCode());
@@ -703,21 +723,28 @@ public class Coordinator {
                 code = TStatusCode.TIMEOUT;
             }
 
-            if (code != TStatusCode.OK) {
-                if (exception != null && errMsg == null) {
-                    errMsg = operation + " failed. " + exception.getMessage();
-                }
-                queryStatus.setStatus(errMsg);
-                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
-                switch (code) {
-                    case TIMEOUT:
-                        throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
-                    case THRIFT_RPC_ERROR:
-                        SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
-                        throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
-                    default:
-                        throw new UserException(errMsg, exception);
+            try {
+                if (code != TStatusCode.OK) {
+                    if (exception != null && errMsg == null) {
+                        errMsg = operation + " failed. " + exception.getMessage();
+                    }
+                    queryStatus.setStatus(errMsg);
+                    cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+                    switch (code) {
+                        case TIMEOUT:
+                            throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
+                        case THRIFT_RPC_ERROR:
+                            SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
+                            throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
+                        default:
+                            throw new UserException(errMsg, exception);
+                    }
                 }
+            } catch (Exception e) {
+                span.recordException(e);
+                throw e;
+            } finally {
+                pair.first.scopedSpan.endSpan();
             }
 
             // succeed to send the plan fragment, update the "alreadySentBackendIds"
@@ -2103,13 +2130,20 @@ public class Coordinator {
                     return false;
                 }
 
-                try {
+                Span span = ConnectContext.get() != null
+                        ? ConnectContext.get().getTracer().spanBuilder("cancelPlanFragmentAsync")
+                        .setParent(Context.current()).setSpanKind(SpanKind.CLIENT).startSpan()
+                        : Telemetry.getNoopSpan();
+                try (Scope scope = span.makeCurrent()) {
                     BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
                             fragmentInstanceId(), cancelReason);
                 } catch (RpcException e) {
+                    span.recordException(e);
                     LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
                             brpcAddress.getPort());
                     SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage());
+                } finally {
+                    span.end();
                 }
 
                 this.hasCanceled = true;
@@ -2156,6 +2190,7 @@ public class Coordinator {
         TNetworkAddress brpcAddr;
         List<BackendExecState> states = Lists.newArrayList();
         boolean twoPhaseExecution = false;
+        ScopedSpan scopedSpan = new ScopedSpan();
 
         public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution) {
             this.beId = beId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 6478551678..57d133fa60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -128,6 +128,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -324,7 +327,16 @@ public class StmtExecutor implements ProfileWriter {
     public void execute() throws Exception {
         UUID uuid = UUID.randomUUID();
         TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
-        execute(queryId);
+        Span executeSpan = context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan();
+        executeSpan.setAttribute("queryId", DebugUtil.printId(queryId));
+        if (originStmt != null) {
+            executeSpan.setAttribute("sql", originStmt.originStmt);
+        }
+        try (Scope scope = executeSpan.makeCurrent()) {
+            execute(queryId);
+        } finally {
+            executeSpan.end();
+        }
     }
 
     // Execute one statement with queryId
@@ -334,6 +346,7 @@ public class StmtExecutor implements ProfileWriter {
     // Exception:
     // IOException: talk with client failed.
     public void execute(TUniqueId queryId) throws Exception {
+        Span span = Span.fromContext(Context.current());
         context.setStartTime();
 
         plannerProfile.setQueryBeginTime();
@@ -350,8 +363,17 @@ public class StmtExecutor implements ProfileWriter {
             analyzeVariablesInStmt();
 
             if (!context.isTxnModel()) {
-                // analyze this query
-                analyze(context.getSessionVariable().toThrift());
+                Span queryAnalysisSpan =
+                        context.getTracer().spanBuilder("query analysis").setParent(Context.current()).startSpan();
+                try (Scope scope = queryAnalysisSpan.makeCurrent()) {
+                    // analyze this query
+                    analyze(context.getSessionVariable().toThrift());
+                } catch (Exception e) {
+                    queryAnalysisSpan.recordException(e);
+                    throw e;
+                } finally {
+                    queryAnalysisSpan.end();
+                }
                 if (isForwardToMaster()) {
                     if (isProxy) {
                         // This is already a stmt forwarded from other FE.
@@ -412,6 +434,7 @@ public class StmtExecutor implements ProfileWriter {
                             AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}",
                                     DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
                             context.setQueryId(newQueryId);
+                            span.setAttribute("queryId", DebugUtil.printId(newQueryId));
                         }
                         handleQueryStmt();
                         // explain query stmt do not have profile
@@ -1005,55 +1028,73 @@ public class StmtExecutor implements ProfileWriter {
         QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                 new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
         coord.setProfileWriter(this);
-        coord.exec();
+        Span queryScheduleSpan =
+                context.getTracer().spanBuilder("query schedule").setParent(Context.current()).startSpan();
+        try (Scope scope = queryScheduleSpan.makeCurrent()) {
+            coord.exec();
+        } catch (Exception e) {
+            queryScheduleSpan.recordException(e);
+            throw e;
+        } finally {
+            queryScheduleSpan.end();
+        }
         plannerProfile.setQueryScheduleFinishTime();
         writeProfile(false);
-        while (true) {
-            batch = coord.getNext();
-            // for outfile query, there will be only one empty batch send back with eos flag
-            if (batch.getBatch() != null) {
-                if (cacheAnalyzer != null) {
-                    cacheAnalyzer.copyRowBatch(batch);
-                }
-                // For some language driver, getting error packet after fields packet
-                // will be recognized as a success result
-                // so We need to send fields after first batch arrived
-                if (!isSendFields) {
-                    if (!isOutfileQuery) {
-                        sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
-                    } else {
-                        sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
+        Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan();
+        try (Scope scope = fetchResultSpan.makeCurrent()) {
+            while (true) {
+                batch = coord.getNext();
+                // for outfile query, there will be only one empty batch send back with eos flag
+                if (batch.getBatch() != null) {
+                    if (cacheAnalyzer != null) {
+                        cacheAnalyzer.copyRowBatch(batch);
+                    }
+                    // For some language driver, getting error packet after fields packet
+                    // will be recognized as a success result
+                    // so We need to send fields after first batch arrived
+                    if (!isSendFields) {
+                        if (!isOutfileQuery) {
+                            sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
+                        } else {
+                            sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
+                        }
+                        isSendFields = true;
                     }
-                    isSendFields = true;
+                    for (ByteBuffer row : batch.getBatch().getRows()) {
+                        channel.sendOnePacket(row);
+                    }
+                    context.updateReturnRows(batch.getBatch().getRows().size());
                 }
-                for (ByteBuffer row : batch.getBatch().getRows()) {
-                    channel.sendOnePacket(row);
+                if (batch.isEos()) {
+                    break;
                 }
-                context.updateReturnRows(batch.getBatch().getRows().size());
             }
-            if (batch.isEos()) {
-                break;
+            if (cacheAnalyzer != null) {
+                if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
+                    isSendFields =
+                            sendCachedValues(channel, cacheResult.getValuesList(), (SelectStmt) queryStmt, isSendFields,
+                                    false);
+                }
+
+                cacheAnalyzer.updateCache();
             }
-        }
-        if (cacheAnalyzer != null) {
-            if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
-                isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), (SelectStmt) queryStmt,
-                        isSendFields, false);
+            if (!isSendFields) {
+                if (!isOutfileQuery) {
+                    sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
+                } else {
+                    sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
+                }
             }
 
-            cacheAnalyzer.updateCache();
-        }
-        if (!isSendFields) {
-            if (!isOutfileQuery) {
-                sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
-            } else {
-                sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
-            }
+            statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
+            context.getState().setEof();
+            plannerProfile.setQueryFetchResultFinishTime();
+        } catch (Exception e) {
+            fetchResultSpan.recordException(e);
+            throw  e;
+        } finally {
+            fetchResultSpan.end();
         }
-
-        statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
-        context.getState().setEof();
-        plannerProfile.setQueryFetchResultFinishTime();
     }
 
     private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 77101e76f0..fec5c1727e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -18,12 +18,21 @@
 package org.apache.doris.rpc;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.PBackendServiceGrpc;
 import org.apache.doris.thrift.TNetworkAddress;
 
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
 import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.opentelemetry.context.Context;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -44,7 +53,7 @@ public class BackendServiceClient {
         channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
                 .flowControlWindow(Config.grpc_max_message_size_bytes)
                 .maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
-                .usePlaintext().build();
+                .intercept(new OpenTelemetryClientInterceptor()).usePlaintext().build();
         stub = PBackendServiceGrpc.newFutureStub(channel);
         blockingStub = PBackendServiceGrpc.newBlockingStub(channel);
     }
@@ -134,4 +143,25 @@ public class BackendServiceClient {
 
         LOG.warn("shut down backend service client: {}", address);
     }
+
+    /**
+     * OpenTelemetry span interceptor.
+     */
+    public static class OpenTelemetryClientInterceptor implements ClientInterceptor {
+        @Override
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
+                CallOptions callOptions, Channel channel) {
+            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+                    channel.newCall(methodDescriptor, callOptions)) {
+                @Override
+                public void start(Listener<RespT> responseListener, Metadata headers) {
+                    // Inject the request with the current context
+                    Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
+                            .inject(Context.current(), headers, (carrier, key, value) -> carrier.put(
+                                    Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value));
+                    super.start(responseListener, headers);
+                }
+            };
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org