You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2020/10/07 14:07:05 UTC
[impala] 03/07: IMPALA-9180 (part 2): Refactor executor_list_ map
of ExecuterBlacklist
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit c9fcbe53c8a4822e1d39de848c7ff45ca2ff2c67
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Thu Sep 24 18:03:55 2020 -0700
IMPALA-9180 (part 2): Refactor executor_list_ map of ExecuterBlacklist
In current ExecuterBlacklist class, we've keyed maps on a
TNetworkAddress of a backend. To simply the logic for the class,
changes it to key off of the UniqueIdPB backend-id, eg. refactor
'executor_list_' to no longer be a map<address -> list<backend>>
and instead makes it a map<backend_id, backend>.
Also fixes a minor bug with the calculation of elapsed time when
a backend that was on probation is re-blacklisted.
Testing:
- Passed test_blacklist.py and test_query_retries.py.
- Passed exhaustive tests.
Change-Id: Ib1ae082d0e080088756af91b5b770752ca8b3aa1
Reviewed-on: http://gerrit.cloudera.org:8080/16506
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/scheduling/executor-blacklist.cc | 156 +++++++++++++-------------------
be/src/scheduling/executor-blacklist.h | 16 ++--
2 files changed, 69 insertions(+), 103 deletions(-)
diff --git a/be/src/scheduling/executor-blacklist.cc b/be/src/scheduling/executor-blacklist.cc
index fae1b20..2a561ca 100644
--- a/be/src/scheduling/executor-blacklist.cc
+++ b/be/src/scheduling/executor-blacklist.cc
@@ -39,34 +39,35 @@ bool ExecutorBlacklist::BlacklistingEnabled() {
void ExecutorBlacklist::Blacklist(
const BackendDescriptorPB& be_desc, const Status& cause) {
DCHECK(BlacklistingEnabled());
- DCHECK(!be_desc.ip_address().empty());
- vector<Entry>& be_descs = executor_list_[be_desc.ip_address()];
- auto it = find_if(be_descs.begin(), be_descs.end(),
- std::bind(eqBePort, be_desc, std::placeholders::_1));
- if (it != be_descs.end()) {
+ DCHECK(be_desc.has_backend_id());
+ auto entry_it = executor_list_.find(be_desc.backend_id());
+ if (entry_it != executor_list_.end()) {
// If this executor was already blacklisted, it must be that two different queries
// tried to blacklist it at about the same time, so just leave it as is.
- if (it->state == State::ON_PROBATION) {
+ Entry& entry = entry_it->second;
+ if (entry.state == State::ON_PROBATION) {
// This executor was on probation, so re-blacklist it.
- it->state = State::BLACKLISTED;
- it->blacklist_time_ms = MonotonicMillis();
- it->cause = cause;
-
int64_t probation_timeout = GetBlacklistTimeoutMs() * PROBATION_TIMEOUT_MULTIPLIER;
- int64_t elapsed = MonotonicMillis() - it->blacklist_time_ms;
+ int64_t current_time_ms = MonotonicMillis();
+ int64_t elapsed = current_time_ms - entry.blacklist_time_ms;
+ entry.state = State::BLACKLISTED;
+ entry.blacklist_time_ms = current_time_ms;
+ entry.cause = cause;
+
// Since NeedsMaintenance() doesn't consider executors that can be removed from
// probation, executors may stay on probation much longer than the timeout, so check
// the timeout here.
- if (elapsed > probation_timeout * it->num_consecutive_blacklistings) {
+ if (elapsed > probation_timeout * entry.num_consecutive_blacklistings) {
// This executor should have already been taken off probation, so act like it was.
- it->num_consecutive_blacklistings = 1;
+ entry.num_consecutive_blacklistings = 1;
} else {
- ++it->num_consecutive_blacklistings;
+ ++entry.num_consecutive_blacklistings;
}
}
} else {
// This executor was not already on the list, create a new Entry for it.
- be_descs.emplace_back(be_desc, MonotonicMillis(), cause);
+ executor_list_.insert(
+ make_pair(be_desc.backend_id(), Entry(be_desc, MonotonicMillis(), cause)));
}
VLOG(2) << "Blacklisted " << be_desc.address() << ", current blacklist: "
<< DebugString();
@@ -74,37 +75,26 @@ void ExecutorBlacklist::Blacklist(
ExecutorBlacklist::State ExecutorBlacklist::FindAndRemove(
const BackendDescriptorPB& be_desc) {
- auto be_descs_it = executor_list_.find(be_desc.ip_address());
- if (be_descs_it == executor_list_.end()) {
- // Executor wasn't on the blacklist.
- return NOT_BLACKLISTED;
- }
- vector<Entry>& be_descs = be_descs_it->second;
- auto remove_it = find_if(be_descs.begin(), be_descs.end(),
- std::bind(eqBePort, be_desc, std::placeholders::_1));
- if (remove_it == be_descs.end()) {
+ auto remove_it = executor_list_.find(be_desc.backend_id());
+ if (remove_it == executor_list_.end()) {
// Executor wasn't on the blacklist.
return NOT_BLACKLISTED;
}
- State removed_state = remove_it->state;
- be_descs.erase(remove_it);
- if (be_descs.empty()) {
- executor_list_.erase(be_descs_it);
- }
+ State removed_state = remove_it->second.state;
+ executor_list_.erase(remove_it);
return removed_state;
}
bool ExecutorBlacklist::NeedsMaintenance() const {
int64_t blacklist_timeout = GetBlacklistTimeoutMs();
int64_t now = MonotonicMillis();
- for (auto executor_it : executor_list_) {
- for (auto entry_it : executor_it.second) {
- if (entry_it.state == State::BLACKLISTED) {
- int64_t elapsed = now - entry_it.blacklist_time_ms;
- if (elapsed > blacklist_timeout * entry_it.num_consecutive_blacklistings) {
- // This backend has passed the timeout and can be put on probation.
- return true;
- }
+ for (auto entry_it : executor_list_) {
+ const Entry& entry = entry_it.second;
+ if (entry.state == State::BLACKLISTED) {
+ int64_t elapsed = now - entry.blacklist_time_ms;
+ if (elapsed > blacklist_timeout * entry.num_consecutive_blacklistings) {
+ // This backend has passed the timeout and can be put on probation.
+ return true;
}
}
}
@@ -115,33 +105,26 @@ void ExecutorBlacklist::Maintenance(std::list<BackendDescriptorPB>* probation_li
int64_t blacklist_timeout = GetBlacklistTimeoutMs();
int64_t probation_timeout = blacklist_timeout * PROBATION_TIMEOUT_MULTIPLIER;
int64_t now = MonotonicMillis();
- auto executor_it = executor_list_.begin();
- while (executor_it != executor_list_.end()) {
- auto entry_it = executor_it->second.begin();
- while (entry_it != executor_it->second.end()) {
- int64_t elapsed = now - entry_it->blacklist_time_ms;
- if (entry_it->state == State::BLACKLISTED) {
- // Check if we can take it off the blacklist and put it on probation.
- if (elapsed > blacklist_timeout * entry_it->num_consecutive_blacklistings) {
- LOG(INFO) << "Executor " << entry_it->be_desc.address()
- << " passed the timeout and will be taken off the blacklist.";
- probation_list->push_back(entry_it->be_desc);
- entry_it->state = State::ON_PROBATION;
- }
- ++entry_it;
- } else {
- // Check if we can take it off probation.
- if (elapsed > probation_timeout * entry_it->num_consecutive_blacklistings) {
- entry_it = executor_it->second.erase(entry_it);
- } else {
- ++entry_it;
- }
+ auto entry_it = executor_list_.begin();
+ while (entry_it != executor_list_.end()) {
+ Entry& entry = entry_it->second;
+ int64_t elapsed = now - entry.blacklist_time_ms;
+ if (entry.state == State::BLACKLISTED) {
+ // Check if we can take it off the blacklist and put it on probation.
+ if (elapsed > blacklist_timeout * entry.num_consecutive_blacklistings) {
+ LOG(INFO) << "Executor " << entry.be_desc.address()
+ << " passed the timeout and will be taken off the blacklist.";
+ probation_list->push_back(entry.be_desc);
+ entry.state = State::ON_PROBATION;
}
- }
- if (executor_it->second.empty()) {
- executor_it = executor_list_.erase(executor_it);
+ ++entry_it;
} else {
- ++executor_it;
+ // Check if we can take it off probation.
+ if (elapsed > probation_timeout * entry.num_consecutive_blacklistings) {
+ entry_it = executor_list_.erase(entry_it);
+ } else {
+ ++entry_it;
+ }
}
}
VLOG(2) << "Completed blacklist maintenance. Current blacklist: " << DebugString();
@@ -149,19 +132,16 @@ void ExecutorBlacklist::Maintenance(std::list<BackendDescriptorPB>* probation_li
bool ExecutorBlacklist::IsBlacklisted(
const BackendDescriptorPB& be_desc, Status* cause, int64_t* time_remaining_ms) const {
- for (auto executor_it : executor_list_) {
- if (executor_it.first == be_desc.ip_address()) {
- for (auto entry_it : executor_it.second) {
- if (entry_it.be_desc.address().port() == be_desc.address().port()
- && entry_it.state == State::BLACKLISTED) {
- if (cause != nullptr) *cause = entry_it.cause;
- int64_t elapsed_ms = MonotonicMillis() - entry_it.blacklist_time_ms;
- int64_t total_timeout_ms =
- GetBlacklistTimeoutMs() * entry_it.num_consecutive_blacklistings;
- *time_remaining_ms = total_timeout_ms - elapsed_ms;
- return true;
- }
- }
+ auto entry_it = executor_list_.find(be_desc.backend_id());
+ if (entry_it != executor_list_.end()) {
+ const Entry& entry = entry_it->second;
+ if (entry.state == State::BLACKLISTED) {
+ if (cause != nullptr) *cause = entry.cause;
+ int64_t elapsed_ms = MonotonicMillis() - entry.blacklist_time_ms;
+ int64_t total_timeout_ms =
+ GetBlacklistTimeoutMs() * entry.num_consecutive_blacklistings;
+ *time_remaining_ms = total_timeout_ms - elapsed_ms;
+ return true;
}
}
return false;
@@ -169,12 +149,9 @@ bool ExecutorBlacklist::IsBlacklisted(
std::string ExecutorBlacklist::BlacklistToString() const {
std::stringstream ss;
- for (auto executor_it : executor_list_) {
- for (auto entry_it : executor_it.second) {
- if (entry_it.state == State::BLACKLISTED) {
- ss << entry_it.be_desc.address() << " ";
- }
- }
+ for (auto entry_it : executor_list_) {
+ const Entry& entry = entry_it.second;
+ if (entry.state == State::BLACKLISTED) ss << entry.be_desc.address() << " ";
}
return ss.str();
}
@@ -182,11 +159,11 @@ std::string ExecutorBlacklist::BlacklistToString() const {
std::string ExecutorBlacklist::DebugString() const {
std::stringstream ss;
ss << "ExecutorBlacklist[";
- for (auto executor_it : executor_list_) {
- for (auto entry_it : executor_it.second) {
- ss << entry_it.be_desc.address() << " ("
- << (entry_it.state == BLACKLISTED ? "blacklisted" : "on probation") << ") ";
- }
+ for (auto entry_it : executor_list_) {
+ const Entry& entry = entry_it.second;
+ DCHECK(entry.state == BLACKLISTED || entry.state == ON_PROBATION);
+ ss << entry.be_desc.address() << " ("
+ << (entry.state == BLACKLISTED ? "blacklisted" : "on probation") << ") ";
}
ss << "]";
return ss.str();
@@ -200,11 +177,4 @@ int64_t ExecutorBlacklist::GetBlacklistTimeoutMs() const {
return BLACKLIST_TIMEOUT_PADDING * Statestore::FailedExecutorDetectionTimeMs();
}
-bool ExecutorBlacklist::eqBePort(
- const BackendDescriptorPB& be_desc, const Entry& existing) {
- // The IP addresses must already match, so it is sufficient to check the port.
- DCHECK_EQ(existing.be_desc.ip_address(), be_desc.ip_address());
- return existing.be_desc.address().port() == be_desc.address().port();
-}
-
} // namespace impala
diff --git a/be/src/scheduling/executor-blacklist.h b/be/src/scheduling/executor-blacklist.h
index 5731bb5..3fce211 100644
--- a/be/src/scheduling/executor-blacklist.h
+++ b/be/src/scheduling/executor-blacklist.h
@@ -21,7 +21,9 @@
#include <vector>
#include "gen-cpp/statestore_service.pb.h"
+#include "util/container-util.h"
#include "util/network-util.h"
+#include "util/unique-id-hash.h"
namespace impala {
@@ -131,16 +133,10 @@ class ExecutorBlacklist {
/// passed the timeout.
int64_t GetBlacklistTimeoutMs() const;
- /// Predicate that returns true if the port number in 'be_desc' matches that in
- /// 'existing.be_desc'. Assumes that they have the same 'ip_address'. Used to do find()
- /// on the values of 'executor_list_'.
- static bool eqBePort(const BackendDescriptorPB& be_desc, const Entry& exiting);
-
- /// Map from node ip address to a list of executors for that node that have been
- /// blacklisted. Note that in normal operation there will be a single impalad per node
- /// all executor lists will be length one. Contains both executors that are blacklisted
- /// ones that are on probation.
- std::unordered_map<IpAddr, std::vector<Entry>> executor_list_;
+ /// Map from executor backend_id to executor entry for those nodes which have been
+ /// blacklisted. Note that the map contains executors that are either blacklisted or
+ /// on probation.
+ std::unordered_map<UniqueIdPB, Entry> executor_list_;
/// The amount to multiply the blacklist timeout by for the probation timeout.
static const int32_t PROBATION_TIMEOUT_MULTIPLIER;