You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/07 04:00:15 UTC

[incubator-doris] branch master updated: [Optimize] Remove some unused code to reduce lock contention (#6566)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 74ddea8  [Optimize] Remove some unused code to reduce lock contention (#6566)
74ddea8 is described below

commit 74ddea8d8352cc3dcad38cdbc66117c7fe848f85
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Sep 7 11:56:12 2021 +0800

    [Optimize] Remove some unused code to reduce lock contention (#6566)
    
    1. Remove global runtime profile counter
    2. Remove unused thread token register
---
 be/src/exec/blocking_join_node.cpp                 |  19 +-
 be/src/exec/hash_join_node.cpp                     |  13 +-
 be/src/exec/olap_scan_node.cpp                     |   2 -
 be/src/runtime/fragment_mgr.cpp                    |   2 +-
 be/src/runtime/fragment_mgr.h                      |   2 -
 be/src/runtime/plan_fragment_executor.cpp          |  26 ---
 be/src/runtime/plan_fragment_executor.h            |  15 --
 be/src/runtime/runtime_state.cpp                   |   9 -
 be/src/util/runtime_profile.cpp                    | 201 +--------------------
 be/src/util/runtime_profile.h                      |  73 --------
 .../load/routineload/RoutineLoadTaskScheduler.java |   4 +-
 11 files changed, 10 insertions(+), 356 deletions(-)

diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp
index b4b2f05..a82e5f7 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -82,10 +82,6 @@ Status BlockingJoinNode::close(RuntimeState* state) {
 
 void BlockingJoinNode::build_side_thread(RuntimeState* state, boost::promise<Status>* status) {
     status->set_value(construct_build_side(state));
-    // Release the thread token as soon as possible (before the main thread joins
-    // on it).  This way, if we had a chain of 10 joins using 1 additional thread,
-    // we'd keep the additional thread busy the whole time.
-    state->resource_pool()->release_thread_token(false);
 }
 
 Status BlockingJoinNode::open(RuntimeState* state) {
@@ -105,19 +101,8 @@ Status BlockingJoinNode::open(RuntimeState* state) {
     // main thread
     boost::promise<Status> build_side_status;
 
-    if (state->resource_pool()->try_acquire_thread_token()) {
-        add_runtime_exec_option("Join Build-Side Prepared Asynchronously");
-        // Thread build_thread(_node_name, "build thread",
-        //     bind(&BlockingJoinNode::BuildSideThread, this, state, &build_side_status));
-        // if (!state->cgroup().empty()) {
-        //   RETURN_IF_ERROR(
-        //       state->exec_env()->cgroups_mgr()->assign_thread_to_cgroup(
-        //           build_thread, state->cgroup()));
-        // }
-        boost::thread(bind(&BlockingJoinNode::build_side_thread, this, state, &build_side_status));
-    } else {
-        build_side_status.set_value(construct_build_side(state));
-    }
+    add_runtime_exec_option("Join Build-Side Prepared Asynchronously");
+    boost::thread(bind(&BlockingJoinNode::build_side_thread, this, state, &build_side_status));
 
     // Open the left child so that it may perform any initialisation in parallel.
     // Don't exit even if we see an error, we still need to wait for the build thread
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index fff59ef..3fc0dfe 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -181,10 +181,6 @@ Status HashJoinNode::close(RuntimeState* state) {
 
 void HashJoinNode::build_side_thread(RuntimeState* state, boost::promise<Status>* status) {
     status->set_value(construct_hash_table(state));
-    // Release the thread token as soon as possible (before the main thread joins
-    // on it).  This way, if we had a chain of 10 joins using 1 additional thread,
-    // we'd keep the additional thread busy the whole time.
-    state->resource_pool()->release_thread_token(false);
 }
 
 Status HashJoinNode::construct_hash_table(RuntimeState* state) {
@@ -238,13 +234,8 @@ Status HashJoinNode::open(RuntimeState* state) {
     // Only do this if we can get a thread token.  Otherwise, do this in the
     // main thread
     boost::promise<Status> thread_status;
-
-    if (state->resource_pool()->try_acquire_thread_token()) {
-        add_runtime_exec_option("Hash Table Built Asynchronously");
-        boost::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status));
-    } else {
-        thread_status.set_value(construct_hash_table(state));
-    }
+    add_runtime_exec_option("Hash Table Built Asynchronously");
+    boost::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status));
 
     if (!_runtime_filter_descs.empty()) {
         RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 45341df..6584100 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1327,7 +1327,6 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
 
 void OlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
-    state->resource_pool()->acquire_thread_token();
     Status status = Status::OK();
     for (auto scanner : _olap_scanners) {
         status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs());
@@ -1483,7 +1482,6 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
         }
     } // end of transfer while
 
-    state->resource_pool()->release_thread_token(true);
     VLOG_CRITICAL << "TransferThread finish.";
     {
         std::unique_lock<std::mutex> l(_row_batches_lock);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 038e9f0..834529c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -187,8 +187,8 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id,
           _executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
                                               this, std::placeholders::_1, std::placeholders::_2,
                                               std::placeholders::_3)),
-          _timeout_second(-1),
           _set_rsc_info(false),
+          _timeout_second(-1),
           _fragments_ctx(std::move(fragments_ctx)) {
     _start_time = DateTimeValue::local_time();
     _coord_addr = _fragments_ctx->coord_addr;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 72c2f60..70233e1 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -83,8 +83,6 @@ public:
                                        const TUniqueId& fragment_instance_id,
                                        std::vector<TScanColumnDesc>* selected_columns);
 
-    RuntimeFilterMergeController& runtimefilter_controller() { return _runtimefilter_controller; }
-
     Status apply_filter(const PPublishFilterRequest* request, const char* attach_data);
 
     Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 37d162e..d0f2cb2 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -53,7 +53,6 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
           _done(false),
           _prepared(false),
           _closed(false),
-          _has_thread_token(false),
           _is_report_success(true),
           _is_report_on_cancel(true),
           _collect_query_statistics_with_every_batch(false) {}
@@ -100,20 +99,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
         _is_report_success = request.query_options.is_report_success;
     }
 
-    // Reserve one main thread from the pool
-    _runtime_state->resource_pool()->acquire_thread_token();
-    _has_thread_token = true;
-
-    _average_thread_tokens = profile()->add_sampling_counter(
-            "AverageThreadTokens",
-            std::bind<int64_t>(std::mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
-                               _runtime_state->resource_pool()));
-
-    // if (_exec_env->process_mem_tracker() != NULL) {
-    //     // we have a global limit
-    //     _runtime_state->mem_trackers()->push_back(_exec_env->process_mem_tracker());
-    // }
-
     int64_t bytes_limit = request.query_options.mem_limit;
     if (bytes_limit <= 0) {
         // sometimes the request does not set the query mem limit, we use default one.
@@ -346,8 +331,6 @@ Status PlanFragmentExecutor::open_internal() {
     _sink.reset(NULL);
     _done = true;
 
-    release_thread_token();
-
     stop_report_thread();
     send_report(true);
 
@@ -465,7 +448,6 @@ Status PlanFragmentExecutor::get_next(RowBatch** batch) {
         LOG(INFO) << "Finished executing fragment query_id=" << print_id(_query_id)
                   << " instance_id=" << print_id(_runtime_state->fragment_instance_id());
         // Query is done, return the thread token
-        release_thread_token();
         stop_report_thread();
         send_report(true);
     }
@@ -542,14 +524,6 @@ RuntimeProfile* PlanFragmentExecutor::profile() {
     return _runtime_state->runtime_profile();
 }
 
-void PlanFragmentExecutor::release_thread_token() {
-    if (_has_thread_token) {
-        _has_thread_token = false;
-        _runtime_state->resource_pool()->release_thread_token(true);
-        profile()->stop_sampling_counters_updates(_average_thread_tokens);
-    }
-}
-
 void PlanFragmentExecutor::close() {
     if (_closed) {
         return;
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index d3fb2d5..f2d1f2a 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -130,9 +130,6 @@ public:
     // Initiate cancellation. Must not be called until after prepare() returned.
     void cancel();
 
-    // Releases the thread token for this fragment executor.
-    void release_thread_token();
-
     // call these only after prepare()
     RuntimeState* runtime_state() { return _runtime_state.get(); }
     const RowDescriptor& row_desc();
@@ -175,9 +172,6 @@ private:
     // true if close() has been called
     bool _closed;
 
-    // true if this fragment has not returned the thread token to the thread resource mgr
-    bool _has_thread_token;
-
     bool _is_report_success;
 
     // If this is set to false, and '_is_report_success' is false as well,
@@ -208,15 +202,6 @@ private:
 
     RuntimeProfile::Counter* _fragment_cpu_timer;
 
-    // Average number of thread tokens for the duration of the plan fragment execution.
-    // Fragments that do a lot of cpu work (non-coordinator fragment) will have at
-    // least 1 token.  Fragments that contain a hdfs scan node will have 1+ tokens
-    // depending on system load.  Other nodes (e.g. hash join node) can also reserve
-    // additional tokens.
-    // This is a measure of how much CPU resources this fragment used during the course
-    // of the execution.
-    RuntimeProfile::Counter* _average_thread_tokens;
-
     // It is shared with BufferControlBlock and will be called in two different
     // threads. But their calls are all at different time, there is no problem of
     // multithreaded access.
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 441046e..ceedbc8 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -155,10 +155,6 @@ RuntimeState::~RuntimeState() {
     if (_buffer_reservation != nullptr) {
         _buffer_reservation->Close();
     }
-
-    if (_exec_env != nullptr && _exec_env->thread_mgr() != nullptr) {
-        _exec_env->thread_mgr()->unregister_pool(_resource_pool);
-    }
 }
 
 Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
@@ -194,11 +190,6 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt
         _query_options.batch_size = DEFAULT_BATCH_SIZE;
     }
 
-    // Register with the thread mgr
-    if (exec_env != NULL) {
-        _resource_pool = exec_env->thread_mgr()->register_pool();
-        DCHECK(_resource_pool != NULL);
-    }
     _db_name = "insert_stmt";
     _import_label = print_id(fragment_instance_id);
 
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 5cf9c20..34f1864 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -43,8 +43,6 @@ static const std::string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryConte
 // The root counter name for all top level counters.
 static const std::string ROOT_COUNTER = "";
 
-RuntimeProfile::PeriodicCounterUpdateState RuntimeProfile::_s_periodic_counter_update_state;
-
 RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile)
         : _pool(new ObjectPool()),
           _own_pool(false),
@@ -57,22 +55,6 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile
 }
 
 RuntimeProfile::~RuntimeProfile() {
-    for (auto iter = _rate_counters.begin(); iter != _rate_counters.end(); ++iter) {
-        stop_rate_counters_updates(*iter);
-    }
-
-    for (auto iter = _sampling_counters.begin(); iter != _sampling_counters.end(); ++iter) {
-        stop_sampling_counters_updates(*iter);
-    }
-
-    std::set<std::vector<Counter*>*>::const_iterator buckets_iter;
-
-    for (buckets_iter = _bucketing_counters.begin(); buckets_iter != _bucketing_counters.end();
-         ++buckets_iter) {
-        // This is just a clean up. No need to perform conversion. Also, the underlying
-        // counters might be gone already.
-        stop_bucketing_counters_updates(*buckets_iter, false);
-    }
 }
 
 void RuntimeProfile::merge(RuntimeProfile* other) {
@@ -654,61 +636,23 @@ RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& nam
     }
 
     Counter* dst_counter = add_counter(name, dst_type);
-    _rate_counters.push_back(dst_counter);
-    register_periodic_counter(src_counter, NULL, dst_counter, RATE_COUNTER);
     return dst_counter;
 }
 
 RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& name, SampleFn fn,
                                                           TUnit::type dst_type) {
-    Counter* dst_counter = add_counter(name, dst_type);
-    register_periodic_counter(NULL, fn, dst_counter, RATE_COUNTER);
-    return dst_counter;
+    return add_counter(name, dst_type);
 }
 
 RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name,
                                                               Counter* src_counter) {
     DCHECK(src_counter->type() == TUnit::UNIT);
-    Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE);
-    register_periodic_counter(src_counter, NULL, dst_counter, SAMPLING_COUNTER);
-    return dst_counter;
+    return add_counter(name, TUnit::DOUBLE_VALUE);
 }
 
 RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name,
                                                               SampleFn sample_fn) {
-    Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE);
-    _sampling_counters.push_back(dst_counter);
-    register_periodic_counter(NULL, sample_fn, dst_counter, SAMPLING_COUNTER);
-    return dst_counter;
-}
-
-void RuntimeProfile::add_bucketing_counters(const std::string& name,
-                                            const std::string& parent_counter_name,
-                                            Counter* src_counter, int num_buckets,
-                                            std::vector<Counter*>* buckets) {
-    {
-        std::lock_guard<std::mutex> l(_counter_map_lock);
-        _bucketing_counters.insert(buckets);
-    }
-
-    for (int i = 0; i < num_buckets; ++i) {
-        std::stringstream counter_name;
-        counter_name << name << "=" << i;
-        buckets->push_back(
-                add_counter(counter_name.str(), TUnit::DOUBLE_VALUE, parent_counter_name));
-    }
-
-    std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-
-    if (_s_periodic_counter_update_state.update_thread.get() == NULL) {
-        _s_periodic_counter_update_state.update_thread.reset(
-                new boost::thread(&RuntimeProfile::periodic_counter_update_loop));
-    }
-
-    BucketCountersInfo info;
-    info.src_counter = src_counter;
-    info.num_sampled = 0;
-    _s_periodic_counter_update_state.bucketing_counters[buckets] = info;
+    return add_counter(name, TUnit::DOUBLE_VALUE);
 }
 
 RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::string& name) {
@@ -724,145 +668,6 @@ RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::str
     return timer;
 }
 
-void RuntimeProfile::register_periodic_counter(Counter* src_counter, SampleFn sample_fn,
-                                               Counter* dst_counter, PeriodicCounterType type) {
-    DCHECK(src_counter == NULL || sample_fn == NULL);
-
-    std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-
-    if (_s_periodic_counter_update_state.update_thread.get() == NULL) {
-        _s_periodic_counter_update_state.update_thread.reset(
-                new boost::thread(&RuntimeProfile::periodic_counter_update_loop));
-    }
-
-    switch (type) {
-    case RATE_COUNTER: {
-        RateCounterInfo counter;
-        counter.src_counter = src_counter;
-        counter.sample_fn = sample_fn;
-        counter.elapsed_ms = 0;
-        _s_periodic_counter_update_state.rate_counters[dst_counter] = counter;
-        break;
-    }
-
-    case SAMPLING_COUNTER: {
-        SamplingCounterInfo counter;
-        counter.src_counter = src_counter;
-        counter.sample_fn = sample_fn;
-        counter.num_sampled = 0;
-        counter.total_sampled_value = 0;
-        _s_periodic_counter_update_state.sampling_counters[dst_counter] = counter;
-        break;
-    }
-
-    default:
-        DCHECK(false) << "Unsupported PeriodicCounterType:" << type;
-    }
-}
-
-void RuntimeProfile::stop_rate_counters_updates(Counter* rate_counter) {
-    std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-    _s_periodic_counter_update_state.rate_counters.erase(rate_counter);
-}
-
-void RuntimeProfile::stop_sampling_counters_updates(Counter* sampling_counter) {
-    std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-    _s_periodic_counter_update_state.sampling_counters.erase(sampling_counter);
-}
-
-void RuntimeProfile::stop_bucketing_counters_updates(std::vector<Counter*>* buckets, bool convert) {
-    int64_t num_sampled = 0;
-    {
-        std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-        PeriodicCounterUpdateState::BucketCountersMap::const_iterator itr =
-                _s_periodic_counter_update_state.bucketing_counters.find(buckets);
-
-        if (itr != _s_periodic_counter_update_state.bucketing_counters.end()) {
-            num_sampled = itr->second.num_sampled;
-            _s_periodic_counter_update_state.bucketing_counters.erase(buckets);
-        }
-    }
-
-    if (convert && num_sampled > 0) {
-        for (Counter* counter : *buckets) {
-            double perc = 100 * counter->value() / (double)num_sampled;
-            counter->set(perc);
-        }
-    }
-}
-
-RuntimeProfile::PeriodicCounterUpdateState::PeriodicCounterUpdateState() : _done(false) {}
-
-RuntimeProfile::PeriodicCounterUpdateState::~PeriodicCounterUpdateState() {
-    if (_s_periodic_counter_update_state.update_thread.get() != NULL) {
-        {
-            // Lock to ensure the update thread will see the update to _done
-            std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-            _done = true;
-        }
-        _s_periodic_counter_update_state.update_thread->join();
-    }
-}
-
-void RuntimeProfile::periodic_counter_update_loop() {
-    while (!_s_periodic_counter_update_state._done) {
-        boost::system_time before_time = boost::get_system_time();
-        SleepFor(MonoDelta::FromMilliseconds(config::periodic_counter_update_period_ms));
-        boost::posix_time::time_duration elapsed = boost::get_system_time() - before_time;
-        int elapsed_ms = elapsed.total_milliseconds();
-
-        std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-
-        for (PeriodicCounterUpdateState::RateCounterMap::iterator it =
-                     _s_periodic_counter_update_state.rate_counters.begin();
-             it != _s_periodic_counter_update_state.rate_counters.end(); ++it) {
-            it->second.elapsed_ms += elapsed_ms;
-            int64_t value;
-
-            if (it->second.src_counter != NULL) {
-                value = it->second.src_counter->value();
-            } else {
-                DCHECK(it->second.sample_fn != NULL);
-                value = it->second.sample_fn();
-            }
-
-            int64_t rate = value * 1000 / (it->second.elapsed_ms);
-            it->first->set(rate);
-        }
-
-        for (PeriodicCounterUpdateState::SamplingCounterMap::iterator it =
-                     _s_periodic_counter_update_state.sampling_counters.begin();
-             it != _s_periodic_counter_update_state.sampling_counters.end(); ++it) {
-            ++it->second.num_sampled;
-            int64_t value;
-
-            if (it->second.src_counter != NULL) {
-                value = it->second.src_counter->value();
-            } else {
-                DCHECK(it->second.sample_fn != NULL);
-                value = it->second.sample_fn();
-            }
-
-            it->second.total_sampled_value += value;
-            double average =
-                    static_cast<double>(it->second.total_sampled_value) / it->second.num_sampled;
-            it->first->set(average);
-        }
-
-        for (PeriodicCounterUpdateState::BucketCountersMap::iterator it =
-                     _s_periodic_counter_update_state.bucketing_counters.begin();
-             it != _s_periodic_counter_update_state.bucketing_counters.end(); ++it) {
-            int64_t val = it->second.src_counter->value();
-
-            if (val >= it->first->size()) {
-                val = it->first->size() - 1;
-            }
-
-            it->first->at(val)->update(1);
-            ++it->second.num_sampled;
-        }
-    }
-}
 
 void RuntimeProfile::print_child_counters(const std::string& prefix,
                                           const std::string& counter_name,
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index a0b01d0..f5edf61 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -414,12 +414,6 @@ public:
     // Same as 'add_sampling_counter' above except the samples are taken by calling fn.
     Counter* add_sampling_counter(const std::string& name, SampleFn fn);
 
-    // Add a bucket of counters to store the sampled value of src_counter.
-    // The src_counter is sampled periodically and the buckets are updated.
-    void add_bucketing_counters(const std::string& name, const std::string& parent_counter_name,
-                                Counter* src_counter, int max_buckets,
-                                std::vector<Counter*>* buckets);
-
     /// Adds a high water mark counter to the runtime profile. Otherwise, same behavior
     /// as AddCounter().
     HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name, TUnit::type unit,
@@ -429,22 +423,6 @@ public:
     std::shared_ptr<HighWaterMarkCounter> AddSharedHighWaterMarkCounter(
             const std::string& name, TUnit::type unit, const std::string& parent_counter_name = "");
 
-    // stops updating the value of 'rate_counter'. Rate counters are updated
-    // periodically so should be removed as soon as the underlying counter is
-    // no longer going to change.
-    void stop_rate_counters_updates(Counter* rate_counter);
-
-    // stops updating the value of 'sampling_counter'. Sampling counters are updated
-    // periodically so should be removed as soon as the underlying counter is
-    // no longer going to change.
-    void stop_sampling_counters_updates(Counter* sampling_counter);
-
-    // stops updating the bucket counter.
-    // If convert is true, convert the buckets from count to percentage.
-    // Sampling counters are updated periodically so should be removed as soon as the
-    // underlying counter is no longer going to change.
-    void stop_bucketing_counters_updates(std::vector<Counter*>* buckets, bool convert);
-
     // Recursively compute the fraction of the 'total_time' spent in this profile and
     // its children.
     // This function updates _local_time_percent for each profile.
@@ -516,10 +494,6 @@ private:
     // of the total time in the entire profile tree.
     double _local_time_percent;
 
-    std::vector<Counter*> _rate_counters;
-
-    std::vector<Counter*> _sampling_counters;
-
     enum PeriodicCounterType {
         RATE_COUNTER = 0,
         SAMPLING_COUNTER,
@@ -544,41 +518,6 @@ private:
         // TODO: customize bucketing
     };
 
-    // This is a static singleton object that is used to update all rate counters and
-    // sampling counters.
-    struct PeriodicCounterUpdateState {
-        PeriodicCounterUpdateState();
-
-        // Tears down the update thread.
-        ~PeriodicCounterUpdateState();
-
-        // Lock protecting state below
-        std::mutex lock;
-
-        // If true, tear down the update thread.
-        volatile bool _done;
-
-        // Thread performing asynchronous updates.
-        boost::scoped_ptr<boost::thread> update_thread;
-
-        // A map of the dst (rate) counter to the src counter and elapsed time.
-        typedef std::map<Counter*, RateCounterInfo> RateCounterMap;
-        RateCounterMap rate_counters;
-
-        // A map of the dst (averages over samples) counter to the src counter (to be sampled)
-        // and number of samples taken.
-        typedef std::map<Counter*, SamplingCounterInfo> SamplingCounterMap;
-        SamplingCounterMap sampling_counters;
-
-        // Map from a bucket of counters to the src counter
-        typedef std::map<std::vector<Counter*>*, BucketCountersInfo> BucketCountersMap;
-        BucketCountersMap bucketing_counters;
-    };
-
-    // Singleton object that keeps track of all rate counters and the thread
-    // for updating them.
-    static PeriodicCounterUpdateState _s_periodic_counter_update_state;
-
     // update a subtree of profiles from nodes, rooted at *idx.
     // On return, *idx points to the node immediately following this subtree.
     void update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
@@ -588,18 +527,6 @@ private:
     // Called recusively.
     void compute_time_in_profile(int64_t total_time);
 
-    // Registers a periodic counter to be updated by the update thread.
-    // Either sample_fn or dst_counter must be non-NULL.  When the periodic counter
-    // is updated, it either gets the value from the dst_counter or calls the sample
-    // function to get the value.
-    // dst_counter/sample fn is assumed to be compatible types with src_counter.
-    static void register_periodic_counter(Counter* src_counter, SampleFn sample_fn,
-                                          Counter* dst_counter, PeriodicCounterType type);
-
-    // Loop for periodic counter update thread.  This thread wakes up once in a while
-    // and updates all the added rate counters and sampling counters.
-    static void periodic_counter_update_loop();
-
     // Print the child counters of the given counter name
     static void print_child_counters(const std::string& prefix, const std::string& counter_name,
                                      const CounterMap& counter_map,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 44cd71c..e204155 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -209,9 +209,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             // And this task will then be aborted because of a timeout.
             // In this way, we can prevent the entire job from being paused due to submit errors,
             // and we can also relieve the pressure on BE by waiting for the timeout period.
-            LOG.warn("failed to submit routine load task {} to BE: {}",
+            LOG.warn("failed to submit routine load task {} to BE: {}, error: {}",
                     DebugUtil.printId(routineLoadTaskInfo.getId()),
-                    routineLoadTaskInfo.getBeId());
+                    routineLoadTaskInfo.getBeId(), e.getMessage());
             routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage());
             // fall through to set ExecuteStartTime
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org