You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/07 04:00:15 UTC
[incubator-doris] branch master updated: [Optimize] Remove some
unused code to reduce lock contention (#6566)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 74ddea8 [Optimize] Remove some unused code to reduce lock contention (#6566)
74ddea8 is described below
commit 74ddea8d8352cc3dcad38cdbc66117c7fe848f85
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Sep 7 11:56:12 2021 +0800
[Optimize] Remove some unused code to reduce lock contention (#6566)
1. Remove global runtime profile counter
2. Remove unused thread token register
---
be/src/exec/blocking_join_node.cpp | 19 +-
be/src/exec/hash_join_node.cpp | 13 +-
be/src/exec/olap_scan_node.cpp | 2 -
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/fragment_mgr.h | 2 -
be/src/runtime/plan_fragment_executor.cpp | 26 ---
be/src/runtime/plan_fragment_executor.h | 15 --
be/src/runtime/runtime_state.cpp | 9 -
be/src/util/runtime_profile.cpp | 201 +--------------------
be/src/util/runtime_profile.h | 73 --------
.../load/routineload/RoutineLoadTaskScheduler.java | 4 +-
11 files changed, 10 insertions(+), 356 deletions(-)
diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp
index b4b2f05..a82e5f7 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -82,10 +82,6 @@ Status BlockingJoinNode::close(RuntimeState* state) {
void BlockingJoinNode::build_side_thread(RuntimeState* state, boost::promise<Status>* status) {
status->set_value(construct_build_side(state));
- // Release the thread token as soon as possible (before the main thread joins
- // on it). This way, if we had a chain of 10 joins using 1 additional thread,
- // we'd keep the additional thread busy the whole time.
- state->resource_pool()->release_thread_token(false);
}
Status BlockingJoinNode::open(RuntimeState* state) {
@@ -105,19 +101,8 @@ Status BlockingJoinNode::open(RuntimeState* state) {
// main thread
boost::promise<Status> build_side_status;
- if (state->resource_pool()->try_acquire_thread_token()) {
- add_runtime_exec_option("Join Build-Side Prepared Asynchronously");
- // Thread build_thread(_node_name, "build thread",
- // bind(&BlockingJoinNode::BuildSideThread, this, state, &build_side_status));
- // if (!state->cgroup().empty()) {
- // RETURN_IF_ERROR(
- // state->exec_env()->cgroups_mgr()->assign_thread_to_cgroup(
- // build_thread, state->cgroup()));
- // }
- boost::thread(bind(&BlockingJoinNode::build_side_thread, this, state, &build_side_status));
- } else {
- build_side_status.set_value(construct_build_side(state));
- }
+ add_runtime_exec_option("Join Build-Side Prepared Asynchronously");
+ boost::thread(bind(&BlockingJoinNode::build_side_thread, this, state, &build_side_status));
// Open the left child so that it may perform any initialisation in parallel.
// Don't exit even if we see an error, we still need to wait for the build thread
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index fff59ef..3fc0dfe 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -181,10 +181,6 @@ Status HashJoinNode::close(RuntimeState* state) {
void HashJoinNode::build_side_thread(RuntimeState* state, boost::promise<Status>* status) {
status->set_value(construct_hash_table(state));
- // Release the thread token as soon as possible (before the main thread joins
- // on it). This way, if we had a chain of 10 joins using 1 additional thread,
- // we'd keep the additional thread busy the whole time.
- state->resource_pool()->release_thread_token(false);
}
Status HashJoinNode::construct_hash_table(RuntimeState* state) {
@@ -238,13 +234,8 @@ Status HashJoinNode::open(RuntimeState* state) {
// Only do this if we can get a thread token. Otherwise, do this in the
// main thread
boost::promise<Status> thread_status;
-
- if (state->resource_pool()->try_acquire_thread_token()) {
- add_runtime_exec_option("Hash Table Built Asynchronously");
- boost::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status));
- } else {
- thread_status.set_value(construct_hash_table(state));
- }
+ add_runtime_exec_option("Hash Table Built Asynchronously");
+ boost::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status));
if (!_runtime_filter_descs.empty()) {
RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 45341df..6584100 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1327,7 +1327,6 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
void OlapScanNode::transfer_thread(RuntimeState* state) {
// scanner open pushdown to scanThread
- state->resource_pool()->acquire_thread_token();
Status status = Status::OK();
for (auto scanner : _olap_scanners) {
status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs());
@@ -1483,7 +1482,6 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
}
} // end of transfer while
- state->resource_pool()->release_thread_token(true);
VLOG_CRITICAL << "TransferThread finish.";
{
std::unique_lock<std::mutex> l(_row_batches_lock);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 038e9f0..834529c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -187,8 +187,8 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id,
_executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)),
- _timeout_second(-1),
_set_rsc_info(false),
+ _timeout_second(-1),
_fragments_ctx(std::move(fragments_ctx)) {
_start_time = DateTimeValue::local_time();
_coord_addr = _fragments_ctx->coord_addr;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 72c2f60..70233e1 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -83,8 +83,6 @@ public:
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>* selected_columns);
- RuntimeFilterMergeController& runtimefilter_controller() { return _runtimefilter_controller; }
-
Status apply_filter(const PPublishFilterRequest* request, const char* attach_data);
Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 37d162e..d0f2cb2 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -53,7 +53,6 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_done(false),
_prepared(false),
_closed(false),
- _has_thread_token(false),
_is_report_success(true),
_is_report_on_cancel(true),
_collect_query_statistics_with_every_batch(false) {}
@@ -100,20 +99,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
_is_report_success = request.query_options.is_report_success;
}
- // Reserve one main thread from the pool
- _runtime_state->resource_pool()->acquire_thread_token();
- _has_thread_token = true;
-
- _average_thread_tokens = profile()->add_sampling_counter(
- "AverageThreadTokens",
- std::bind<int64_t>(std::mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
- _runtime_state->resource_pool()));
-
- // if (_exec_env->process_mem_tracker() != NULL) {
- // // we have a global limit
- // _runtime_state->mem_trackers()->push_back(_exec_env->process_mem_tracker());
- // }
-
int64_t bytes_limit = request.query_options.mem_limit;
if (bytes_limit <= 0) {
// sometimes the request does not set the query mem limit, we use default one.
@@ -346,8 +331,6 @@ Status PlanFragmentExecutor::open_internal() {
_sink.reset(NULL);
_done = true;
- release_thread_token();
-
stop_report_thread();
send_report(true);
@@ -465,7 +448,6 @@ Status PlanFragmentExecutor::get_next(RowBatch** batch) {
LOG(INFO) << "Finished executing fragment query_id=" << print_id(_query_id)
<< " instance_id=" << print_id(_runtime_state->fragment_instance_id());
// Query is done, return the thread token
- release_thread_token();
stop_report_thread();
send_report(true);
}
@@ -542,14 +524,6 @@ RuntimeProfile* PlanFragmentExecutor::profile() {
return _runtime_state->runtime_profile();
}
-void PlanFragmentExecutor::release_thread_token() {
- if (_has_thread_token) {
- _has_thread_token = false;
- _runtime_state->resource_pool()->release_thread_token(true);
- profile()->stop_sampling_counters_updates(_average_thread_tokens);
- }
-}
-
void PlanFragmentExecutor::close() {
if (_closed) {
return;
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index d3fb2d5..f2d1f2a 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -130,9 +130,6 @@ public:
// Initiate cancellation. Must not be called until after prepare() returned.
void cancel();
- // Releases the thread token for this fragment executor.
- void release_thread_token();
-
// call these only after prepare()
RuntimeState* runtime_state() { return _runtime_state.get(); }
const RowDescriptor& row_desc();
@@ -175,9 +172,6 @@ private:
// true if close() has been called
bool _closed;
- // true if this fragment has not returned the thread token to the thread resource mgr
- bool _has_thread_token;
-
bool _is_report_success;
// If this is set to false, and '_is_report_success' is false as well,
@@ -208,15 +202,6 @@ private:
RuntimeProfile::Counter* _fragment_cpu_timer;
- // Average number of thread tokens for the duration of the plan fragment execution.
- // Fragments that do a lot of cpu work (non-coordinator fragment) will have at
- // least 1 token. Fragments that contain a hdfs scan node will have 1+ tokens
- // depending on system load. Other nodes (e.g. hash join node) can also reserve
- // additional tokens.
- // This is a measure of how much CPU resources this fragment used during the course
- // of the execution.
- RuntimeProfile::Counter* _average_thread_tokens;
-
// It is shared with BufferControlBlock and will be called in two different
// threads. But their calls are all at different time, there is no problem of
// multithreaded access.
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 441046e..ceedbc8 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -155,10 +155,6 @@ RuntimeState::~RuntimeState() {
if (_buffer_reservation != nullptr) {
_buffer_reservation->Close();
}
-
- if (_exec_env != nullptr && _exec_env->thread_mgr() != nullptr) {
- _exec_env->thread_mgr()->unregister_pool(_resource_pool);
- }
}
Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
@@ -194,11 +190,6 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt
_query_options.batch_size = DEFAULT_BATCH_SIZE;
}
- // Register with the thread mgr
- if (exec_env != NULL) {
- _resource_pool = exec_env->thread_mgr()->register_pool();
- DCHECK(_resource_pool != NULL);
- }
_db_name = "insert_stmt";
_import_label = print_id(fragment_instance_id);
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 5cf9c20..34f1864 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -43,8 +43,6 @@ static const std::string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryConte
// The root counter name for all top level counters.
static const std::string ROOT_COUNTER = "";
-RuntimeProfile::PeriodicCounterUpdateState RuntimeProfile::_s_periodic_counter_update_state;
-
RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile)
: _pool(new ObjectPool()),
_own_pool(false),
@@ -57,22 +55,6 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile
}
RuntimeProfile::~RuntimeProfile() {
- for (auto iter = _rate_counters.begin(); iter != _rate_counters.end(); ++iter) {
- stop_rate_counters_updates(*iter);
- }
-
- for (auto iter = _sampling_counters.begin(); iter != _sampling_counters.end(); ++iter) {
- stop_sampling_counters_updates(*iter);
- }
-
- std::set<std::vector<Counter*>*>::const_iterator buckets_iter;
-
- for (buckets_iter = _bucketing_counters.begin(); buckets_iter != _bucketing_counters.end();
- ++buckets_iter) {
- // This is just a clean up. No need to perform conversion. Also, the underlying
- // counters might be gone already.
- stop_bucketing_counters_updates(*buckets_iter, false);
- }
}
void RuntimeProfile::merge(RuntimeProfile* other) {
@@ -654,61 +636,23 @@ RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& nam
}
Counter* dst_counter = add_counter(name, dst_type);
- _rate_counters.push_back(dst_counter);
- register_periodic_counter(src_counter, NULL, dst_counter, RATE_COUNTER);
return dst_counter;
}
RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& name, SampleFn fn,
TUnit::type dst_type) {
- Counter* dst_counter = add_counter(name, dst_type);
- register_periodic_counter(NULL, fn, dst_counter, RATE_COUNTER);
- return dst_counter;
+ return add_counter(name, dst_type);
}
RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name,
Counter* src_counter) {
DCHECK(src_counter->type() == TUnit::UNIT);
- Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE);
- register_periodic_counter(src_counter, NULL, dst_counter, SAMPLING_COUNTER);
- return dst_counter;
+ return add_counter(name, TUnit::DOUBLE_VALUE);
}
RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name,
SampleFn sample_fn) {
- Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE);
- _sampling_counters.push_back(dst_counter);
- register_periodic_counter(NULL, sample_fn, dst_counter, SAMPLING_COUNTER);
- return dst_counter;
-}
-
-void RuntimeProfile::add_bucketing_counters(const std::string& name,
- const std::string& parent_counter_name,
- Counter* src_counter, int num_buckets,
- std::vector<Counter*>* buckets) {
- {
- std::lock_guard<std::mutex> l(_counter_map_lock);
- _bucketing_counters.insert(buckets);
- }
-
- for (int i = 0; i < num_buckets; ++i) {
- std::stringstream counter_name;
- counter_name << name << "=" << i;
- buckets->push_back(
- add_counter(counter_name.str(), TUnit::DOUBLE_VALUE, parent_counter_name));
- }
-
- std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-
- if (_s_periodic_counter_update_state.update_thread.get() == NULL) {
- _s_periodic_counter_update_state.update_thread.reset(
- new boost::thread(&RuntimeProfile::periodic_counter_update_loop));
- }
-
- BucketCountersInfo info;
- info.src_counter = src_counter;
- info.num_sampled = 0;
- _s_periodic_counter_update_state.bucketing_counters[buckets] = info;
+ return add_counter(name, TUnit::DOUBLE_VALUE);
}
RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::string& name) {
@@ -724,145 +668,6 @@ RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::str
return timer;
}
-void RuntimeProfile::register_periodic_counter(Counter* src_counter, SampleFn sample_fn,
- Counter* dst_counter, PeriodicCounterType type) {
- DCHECK(src_counter == NULL || sample_fn == NULL);
-
- std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-
- if (_s_periodic_counter_update_state.update_thread.get() == NULL) {
- _s_periodic_counter_update_state.update_thread.reset(
- new boost::thread(&RuntimeProfile::periodic_counter_update_loop));
- }
-
- switch (type) {
- case RATE_COUNTER: {
- RateCounterInfo counter;
- counter.src_counter = src_counter;
- counter.sample_fn = sample_fn;
- counter.elapsed_ms = 0;
- _s_periodic_counter_update_state.rate_counters[dst_counter] = counter;
- break;
- }
-
- case SAMPLING_COUNTER: {
- SamplingCounterInfo counter;
- counter.src_counter = src_counter;
- counter.sample_fn = sample_fn;
- counter.num_sampled = 0;
- counter.total_sampled_value = 0;
- _s_periodic_counter_update_state.sampling_counters[dst_counter] = counter;
- break;
- }
-
- default:
- DCHECK(false) << "Unsupported PeriodicCounterType:" << type;
- }
-}
-
-void RuntimeProfile::stop_rate_counters_updates(Counter* rate_counter) {
- std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
- _s_periodic_counter_update_state.rate_counters.erase(rate_counter);
-}
-
-void RuntimeProfile::stop_sampling_counters_updates(Counter* sampling_counter) {
- std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
- _s_periodic_counter_update_state.sampling_counters.erase(sampling_counter);
-}
-
-void RuntimeProfile::stop_bucketing_counters_updates(std::vector<Counter*>* buckets, bool convert) {
- int64_t num_sampled = 0;
- {
- std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
- PeriodicCounterUpdateState::BucketCountersMap::const_iterator itr =
- _s_periodic_counter_update_state.bucketing_counters.find(buckets);
-
- if (itr != _s_periodic_counter_update_state.bucketing_counters.end()) {
- num_sampled = itr->second.num_sampled;
- _s_periodic_counter_update_state.bucketing_counters.erase(buckets);
- }
- }
-
- if (convert && num_sampled > 0) {
- for (Counter* counter : *buckets) {
- double perc = 100 * counter->value() / (double)num_sampled;
- counter->set(perc);
- }
- }
-}
-
-RuntimeProfile::PeriodicCounterUpdateState::PeriodicCounterUpdateState() : _done(false) {}
-
-RuntimeProfile::PeriodicCounterUpdateState::~PeriodicCounterUpdateState() {
- if (_s_periodic_counter_update_state.update_thread.get() != NULL) {
- {
- // Lock to ensure the update thread will see the update to _done
- std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
- _done = true;
- }
- _s_periodic_counter_update_state.update_thread->join();
- }
-}
-
-void RuntimeProfile::periodic_counter_update_loop() {
- while (!_s_periodic_counter_update_state._done) {
- boost::system_time before_time = boost::get_system_time();
- SleepFor(MonoDelta::FromMilliseconds(config::periodic_counter_update_period_ms));
- boost::posix_time::time_duration elapsed = boost::get_system_time() - before_time;
- int elapsed_ms = elapsed.total_milliseconds();
-
- std::lock_guard<std::mutex> l(_s_periodic_counter_update_state.lock);
-
- for (PeriodicCounterUpdateState::RateCounterMap::iterator it =
- _s_periodic_counter_update_state.rate_counters.begin();
- it != _s_periodic_counter_update_state.rate_counters.end(); ++it) {
- it->second.elapsed_ms += elapsed_ms;
- int64_t value;
-
- if (it->second.src_counter != NULL) {
- value = it->second.src_counter->value();
- } else {
- DCHECK(it->second.sample_fn != NULL);
- value = it->second.sample_fn();
- }
-
- int64_t rate = value * 1000 / (it->second.elapsed_ms);
- it->first->set(rate);
- }
-
- for (PeriodicCounterUpdateState::SamplingCounterMap::iterator it =
- _s_periodic_counter_update_state.sampling_counters.begin();
- it != _s_periodic_counter_update_state.sampling_counters.end(); ++it) {
- ++it->second.num_sampled;
- int64_t value;
-
- if (it->second.src_counter != NULL) {
- value = it->second.src_counter->value();
- } else {
- DCHECK(it->second.sample_fn != NULL);
- value = it->second.sample_fn();
- }
-
- it->second.total_sampled_value += value;
- double average =
- static_cast<double>(it->second.total_sampled_value) / it->second.num_sampled;
- it->first->set(average);
- }
-
- for (PeriodicCounterUpdateState::BucketCountersMap::iterator it =
- _s_periodic_counter_update_state.bucketing_counters.begin();
- it != _s_periodic_counter_update_state.bucketing_counters.end(); ++it) {
- int64_t val = it->second.src_counter->value();
-
- if (val >= it->first->size()) {
- val = it->first->size() - 1;
- }
-
- it->first->at(val)->update(1);
- ++it->second.num_sampled;
- }
- }
-}
void RuntimeProfile::print_child_counters(const std::string& prefix,
const std::string& counter_name,
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index a0b01d0..f5edf61 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -414,12 +414,6 @@ public:
// Same as 'add_sampling_counter' above except the samples are taken by calling fn.
Counter* add_sampling_counter(const std::string& name, SampleFn fn);
- // Add a bucket of counters to store the sampled value of src_counter.
- // The src_counter is sampled periodically and the buckets are updated.
- void add_bucketing_counters(const std::string& name, const std::string& parent_counter_name,
- Counter* src_counter, int max_buckets,
- std::vector<Counter*>* buckets);
-
/// Adds a high water mark counter to the runtime profile. Otherwise, same behavior
/// as AddCounter().
HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name, TUnit::type unit,
@@ -429,22 +423,6 @@ public:
std::shared_ptr<HighWaterMarkCounter> AddSharedHighWaterMarkCounter(
const std::string& name, TUnit::type unit, const std::string& parent_counter_name = "");
- // stops updating the value of 'rate_counter'. Rate counters are updated
- // periodically so should be removed as soon as the underlying counter is
- // no longer going to change.
- void stop_rate_counters_updates(Counter* rate_counter);
-
- // stops updating the value of 'sampling_counter'. Sampling counters are updated
- // periodically so should be removed as soon as the underlying counter is
- // no longer going to change.
- void stop_sampling_counters_updates(Counter* sampling_counter);
-
- // stops updating the bucket counter.
- // If convert is true, convert the buckets from count to percentage.
- // Sampling counters are updated periodically so should be removed as soon as the
- // underlying counter is no longer going to change.
- void stop_bucketing_counters_updates(std::vector<Counter*>* buckets, bool convert);
-
// Recursively compute the fraction of the 'total_time' spent in this profile and
// its children.
// This function updates _local_time_percent for each profile.
@@ -516,10 +494,6 @@ private:
// of the total time in the entire profile tree.
double _local_time_percent;
- std::vector<Counter*> _rate_counters;
-
- std::vector<Counter*> _sampling_counters;
-
enum PeriodicCounterType {
RATE_COUNTER = 0,
SAMPLING_COUNTER,
@@ -544,41 +518,6 @@ private:
// TODO: customize bucketing
};
- // This is a static singleton object that is used to update all rate counters and
- // sampling counters.
- struct PeriodicCounterUpdateState {
- PeriodicCounterUpdateState();
-
- // Tears down the update thread.
- ~PeriodicCounterUpdateState();
-
- // Lock protecting state below
- std::mutex lock;
-
- // If true, tear down the update thread.
- volatile bool _done;
-
- // Thread performing asynchronous updates.
- boost::scoped_ptr<boost::thread> update_thread;
-
- // A map of the dst (rate) counter to the src counter and elapsed time.
- typedef std::map<Counter*, RateCounterInfo> RateCounterMap;
- RateCounterMap rate_counters;
-
- // A map of the dst (averages over samples) counter to the src counter (to be sampled)
- // and number of samples taken.
- typedef std::map<Counter*, SamplingCounterInfo> SamplingCounterMap;
- SamplingCounterMap sampling_counters;
-
- // Map from a bucket of counters to the src counter
- typedef std::map<std::vector<Counter*>*, BucketCountersInfo> BucketCountersMap;
- BucketCountersMap bucketing_counters;
- };
-
- // Singleton object that keeps track of all rate counters and the thread
- // for updating them.
- static PeriodicCounterUpdateState _s_periodic_counter_update_state;
-
// update a subtree of profiles from nodes, rooted at *idx.
// On return, *idx points to the node immediately following this subtree.
void update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
@@ -588,18 +527,6 @@ private:
// Called recusively.
void compute_time_in_profile(int64_t total_time);
- // Registers a periodic counter to be updated by the update thread.
- // Either sample_fn or dst_counter must be non-NULL. When the periodic counter
- // is updated, it either gets the value from the dst_counter or calls the sample
- // function to get the value.
- // dst_counter/sample fn is assumed to be compatible types with src_counter.
- static void register_periodic_counter(Counter* src_counter, SampleFn sample_fn,
- Counter* dst_counter, PeriodicCounterType type);
-
- // Loop for periodic counter update thread. This thread wakes up once in a while
- // and updates all the added rate counters and sampling counters.
- static void periodic_counter_update_loop();
-
// Print the child counters of the given counter name
static void print_child_counters(const std::string& prefix, const std::string& counter_name,
const CounterMap& counter_map,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 44cd71c..e204155 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -209,9 +209,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// And this task will then be aborted because of a timeout.
// In this way, we can prevent the entire job from being paused due to submit errors,
// and we can also relieve the pressure on BE by waiting for the timeout period.
- LOG.warn("failed to submit routine load task {} to BE: {}",
+ LOG.warn("failed to submit routine load task {} to BE: {}, error: {}",
DebugUtil.printId(routineLoadTaskInfo.getId()),
- routineLoadTaskInfo.getBeId());
+ routineLoadTaskInfo.getBeId(), e.getMessage());
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage());
// fall through to set ExecuteStartTime
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org