You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2020/10/07 14:07:05 UTC

[impala] 03/07: IMPALA-9180 (part 2): Refactor executor_list_ map of ExecuterBlacklist

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c9fcbe53c8a4822e1d39de848c7ff45ca2ff2c67
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Thu Sep 24 18:03:55 2020 -0700

    IMPALA-9180 (part 2): Refactor executor_list_ map of ExecuterBlacklist
    
    In current ExecuterBlacklist class, we've keyed maps on a
    TNetworkAddress of a backend. To simply the logic for the class,
    changes it to key off of the UniqueIdPB backend-id, eg. refactor
    'executor_list_' to no longer be a map<address -> list<backend>>
    and instead makes it a map<backend_id, backend>.
    Also fixes a minor bug with the calculation of elapsed time when
    a backend that was on probation is re-blacklisted.
    
    Testing:
     - Passed test_blacklist.py and test_query_retries.py.
     - Passed exhaustive tests.
    
    Change-Id: Ib1ae082d0e080088756af91b5b770752ca8b3aa1
    Reviewed-on: http://gerrit.cloudera.org:8080/16506
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/executor-blacklist.cc | 156 +++++++++++++-------------------
 be/src/scheduling/executor-blacklist.h  |  16 ++--
 2 files changed, 69 insertions(+), 103 deletions(-)

diff --git a/be/src/scheduling/executor-blacklist.cc b/be/src/scheduling/executor-blacklist.cc
index fae1b20..2a561ca 100644
--- a/be/src/scheduling/executor-blacklist.cc
+++ b/be/src/scheduling/executor-blacklist.cc
@@ -39,34 +39,35 @@ bool ExecutorBlacklist::BlacklistingEnabled() {
 void ExecutorBlacklist::Blacklist(
     const BackendDescriptorPB& be_desc, const Status& cause) {
   DCHECK(BlacklistingEnabled());
-  DCHECK(!be_desc.ip_address().empty());
-  vector<Entry>& be_descs = executor_list_[be_desc.ip_address()];
-  auto it = find_if(be_descs.begin(), be_descs.end(),
-      std::bind(eqBePort, be_desc, std::placeholders::_1));
-  if (it != be_descs.end()) {
+  DCHECK(be_desc.has_backend_id());
+  auto entry_it = executor_list_.find(be_desc.backend_id());
+  if (entry_it != executor_list_.end()) {
     // If this executor was already blacklisted, it must be that two different queries
     // tried to blacklist it at about the same time, so just leave it as is.
-    if (it->state == State::ON_PROBATION) {
+    Entry& entry = entry_it->second;
+    if (entry.state == State::ON_PROBATION) {
       // This executor was on probation, so re-blacklist it.
-      it->state = State::BLACKLISTED;
-      it->blacklist_time_ms = MonotonicMillis();
-      it->cause = cause;
-
       int64_t probation_timeout = GetBlacklistTimeoutMs() * PROBATION_TIMEOUT_MULTIPLIER;
-      int64_t elapsed = MonotonicMillis() - it->blacklist_time_ms;
+      int64_t current_time_ms = MonotonicMillis();
+      int64_t elapsed = current_time_ms - entry.blacklist_time_ms;
+      entry.state = State::BLACKLISTED;
+      entry.blacklist_time_ms = current_time_ms;
+      entry.cause = cause;
+
       // Since NeedsMaintenance() doesn't consider executors that can be removed from
       // probation, executors may stay on probation much longer than the timeout, so check
       // the timeout here.
-      if (elapsed > probation_timeout * it->num_consecutive_blacklistings) {
+      if (elapsed > probation_timeout * entry.num_consecutive_blacklistings) {
         // This executor should have already been taken off probation, so act like it was.
-        it->num_consecutive_blacklistings = 1;
+        entry.num_consecutive_blacklistings = 1;
       } else {
-        ++it->num_consecutive_blacklistings;
+        ++entry.num_consecutive_blacklistings;
       }
     }
   } else {
     // This executor was not already on the list, create a new Entry for it.
-    be_descs.emplace_back(be_desc, MonotonicMillis(), cause);
+    executor_list_.insert(
+        make_pair(be_desc.backend_id(), Entry(be_desc, MonotonicMillis(), cause)));
   }
   VLOG(2) << "Blacklisted " << be_desc.address() << ", current blacklist: "
           << DebugString();
@@ -74,37 +75,26 @@ void ExecutorBlacklist::Blacklist(
 
 ExecutorBlacklist::State ExecutorBlacklist::FindAndRemove(
     const BackendDescriptorPB& be_desc) {
-  auto be_descs_it = executor_list_.find(be_desc.ip_address());
-  if (be_descs_it == executor_list_.end()) {
-    // Executor wasn't on the blacklist.
-    return NOT_BLACKLISTED;
-  }
-  vector<Entry>& be_descs = be_descs_it->second;
-  auto remove_it = find_if(be_descs.begin(), be_descs.end(),
-      std::bind(eqBePort, be_desc, std::placeholders::_1));
-  if (remove_it == be_descs.end()) {
+  auto remove_it = executor_list_.find(be_desc.backend_id());
+  if (remove_it == executor_list_.end()) {
     // Executor wasn't on the blacklist.
     return NOT_BLACKLISTED;
   }
-  State removed_state = remove_it->state;
-  be_descs.erase(remove_it);
-  if (be_descs.empty()) {
-    executor_list_.erase(be_descs_it);
-  }
+  State removed_state = remove_it->second.state;
+  executor_list_.erase(remove_it);
   return removed_state;
 }
 
 bool ExecutorBlacklist::NeedsMaintenance() const {
   int64_t blacklist_timeout = GetBlacklistTimeoutMs();
   int64_t now = MonotonicMillis();
-  for (auto executor_it : executor_list_) {
-    for (auto entry_it : executor_it.second) {
-      if (entry_it.state == State::BLACKLISTED) {
-        int64_t elapsed = now - entry_it.blacklist_time_ms;
-        if (elapsed > blacklist_timeout * entry_it.num_consecutive_blacklistings) {
-          // This backend has passed the timeout and can be put on probation.
-          return true;
-        }
+  for (auto entry_it : executor_list_) {
+    const Entry& entry = entry_it.second;
+    if (entry.state == State::BLACKLISTED) {
+      int64_t elapsed = now - entry.blacklist_time_ms;
+      if (elapsed > blacklist_timeout * entry.num_consecutive_blacklistings) {
+        // This backend has passed the timeout and can be put on probation.
+        return true;
       }
     }
   }
@@ -115,33 +105,26 @@ void ExecutorBlacklist::Maintenance(std::list<BackendDescriptorPB>* probation_li
   int64_t blacklist_timeout = GetBlacklistTimeoutMs();
   int64_t probation_timeout = blacklist_timeout * PROBATION_TIMEOUT_MULTIPLIER;
   int64_t now = MonotonicMillis();
-  auto executor_it = executor_list_.begin();
-  while (executor_it != executor_list_.end()) {
-    auto entry_it = executor_it->second.begin();
-    while (entry_it != executor_it->second.end()) {
-      int64_t elapsed = now - entry_it->blacklist_time_ms;
-      if (entry_it->state == State::BLACKLISTED) {
-        // Check if we can take it off the blacklist and put it on probation.
-        if (elapsed > blacklist_timeout * entry_it->num_consecutive_blacklistings) {
-          LOG(INFO) << "Executor " << entry_it->be_desc.address()
-                    << " passed the timeout and will be taken off the blacklist.";
-          probation_list->push_back(entry_it->be_desc);
-          entry_it->state = State::ON_PROBATION;
-        }
-        ++entry_it;
-      } else {
-        // Check if we can take it off probation.
-        if (elapsed > probation_timeout * entry_it->num_consecutive_blacklistings) {
-          entry_it = executor_it->second.erase(entry_it);
-        } else {
-          ++entry_it;
-        }
+  auto entry_it = executor_list_.begin();
+  while (entry_it != executor_list_.end()) {
+    Entry& entry = entry_it->second;
+    int64_t elapsed = now - entry.blacklist_time_ms;
+    if (entry.state == State::BLACKLISTED) {
+      // Check if we can take it off the blacklist and put it on probation.
+      if (elapsed > blacklist_timeout * entry.num_consecutive_blacklistings) {
+        LOG(INFO) << "Executor " << entry.be_desc.address()
+                  << " passed the timeout and will be taken off the blacklist.";
+        probation_list->push_back(entry.be_desc);
+        entry.state = State::ON_PROBATION;
       }
-    }
-    if (executor_it->second.empty()) {
-      executor_it = executor_list_.erase(executor_it);
+      ++entry_it;
     } else {
-      ++executor_it;
+      // Check if we can take it off probation.
+      if (elapsed > probation_timeout * entry.num_consecutive_blacklistings) {
+        entry_it = executor_list_.erase(entry_it);
+      } else {
+        ++entry_it;
+      }
     }
   }
   VLOG(2) << "Completed blacklist maintenance. Current blacklist: " << DebugString();
@@ -149,19 +132,16 @@ void ExecutorBlacklist::Maintenance(std::list<BackendDescriptorPB>* probation_li
 
 bool ExecutorBlacklist::IsBlacklisted(
     const BackendDescriptorPB& be_desc, Status* cause, int64_t* time_remaining_ms) const {
-  for (auto executor_it : executor_list_) {
-    if (executor_it.first == be_desc.ip_address()) {
-      for (auto entry_it : executor_it.second) {
-        if (entry_it.be_desc.address().port() == be_desc.address().port()
-            && entry_it.state == State::BLACKLISTED) {
-          if (cause != nullptr) *cause = entry_it.cause;
-          int64_t elapsed_ms = MonotonicMillis() - entry_it.blacklist_time_ms;
-          int64_t total_timeout_ms =
-              GetBlacklistTimeoutMs() * entry_it.num_consecutive_blacklistings;
-          *time_remaining_ms = total_timeout_ms - elapsed_ms;
-          return true;
-        }
-      }
+  auto entry_it = executor_list_.find(be_desc.backend_id());
+  if (entry_it != executor_list_.end()) {
+    const Entry& entry = entry_it->second;
+    if (entry.state == State::BLACKLISTED) {
+      if (cause != nullptr) *cause = entry.cause;
+      int64_t elapsed_ms = MonotonicMillis() - entry.blacklist_time_ms;
+      int64_t total_timeout_ms =
+          GetBlacklistTimeoutMs() * entry.num_consecutive_blacklistings;
+      *time_remaining_ms = total_timeout_ms - elapsed_ms;
+      return true;
     }
   }
   return false;
@@ -169,12 +149,9 @@ bool ExecutorBlacklist::IsBlacklisted(
 
 std::string ExecutorBlacklist::BlacklistToString() const {
   std::stringstream ss;
-  for (auto executor_it : executor_list_) {
-    for (auto entry_it : executor_it.second) {
-      if (entry_it.state == State::BLACKLISTED) {
-        ss << entry_it.be_desc.address() << " ";
-      }
-    }
+  for (auto entry_it : executor_list_) {
+    const Entry& entry = entry_it.second;
+    if (entry.state == State::BLACKLISTED) ss << entry.be_desc.address() << " ";
   }
   return ss.str();
 }
@@ -182,11 +159,11 @@ std::string ExecutorBlacklist::BlacklistToString() const {
 std::string ExecutorBlacklist::DebugString() const {
   std::stringstream ss;
   ss << "ExecutorBlacklist[";
-  for (auto executor_it : executor_list_) {
-    for (auto entry_it : executor_it.second) {
-      ss << entry_it.be_desc.address() << " ("
-         << (entry_it.state == BLACKLISTED ? "blacklisted" : "on probation") << ") ";
-    }
+  for (auto entry_it : executor_list_) {
+    const Entry& entry = entry_it.second;
+    DCHECK(entry.state == BLACKLISTED || entry.state == ON_PROBATION);
+    ss << entry.be_desc.address() << " ("
+       << (entry.state == BLACKLISTED ? "blacklisted" : "on probation") << ") ";
   }
   ss << "]";
   return ss.str();
@@ -200,11 +177,4 @@ int64_t ExecutorBlacklist::GetBlacklistTimeoutMs() const {
   return BLACKLIST_TIMEOUT_PADDING * Statestore::FailedExecutorDetectionTimeMs();
 }
 
-bool ExecutorBlacklist::eqBePort(
-    const BackendDescriptorPB& be_desc, const Entry& existing) {
-  // The IP addresses must already match, so it is sufficient to check the port.
-  DCHECK_EQ(existing.be_desc.ip_address(), be_desc.ip_address());
-  return existing.be_desc.address().port() == be_desc.address().port();
-}
-
 } // namespace impala
diff --git a/be/src/scheduling/executor-blacklist.h b/be/src/scheduling/executor-blacklist.h
index 5731bb5..3fce211 100644
--- a/be/src/scheduling/executor-blacklist.h
+++ b/be/src/scheduling/executor-blacklist.h
@@ -21,7 +21,9 @@
 #include <vector>
 
 #include "gen-cpp/statestore_service.pb.h"
+#include "util/container-util.h"
 #include "util/network-util.h"
+#include "util/unique-id-hash.h"
 
 namespace impala {
 
@@ -131,16 +133,10 @@ class ExecutorBlacklist {
   /// passed the timeout.
   int64_t GetBlacklistTimeoutMs() const;
 
-  /// Predicate that returns true if the port number in 'be_desc' matches that in
-  /// 'existing.be_desc'. Assumes that they have the same 'ip_address'. Used to do find()
-  /// on the values of 'executor_list_'.
-  static bool eqBePort(const BackendDescriptorPB& be_desc, const Entry& exiting);
-
-  /// Map from node ip address to a list of executors for that node that have been
-  /// blacklisted. Note that in normal operation there will be a single impalad per node
-  /// all executor lists will be length one. Contains both executors that are blacklisted
-  /// ones that are on probation.
-  std::unordered_map<IpAddr, std::vector<Entry>> executor_list_;
+  /// Map from executor backend_id to executor entry for those nodes which have been
+  /// blacklisted. Note that the map contains executors that are either blacklisted or
+  /// on probation.
+  std::unordered_map<UniqueIdPB, Entry> executor_list_;
 
   /// The amount to multiply the blacklist timeout by for the probation timeout.
   static const int32_t PROBATION_TIMEOUT_MULTIPLIER;