You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/25 07:57:18 UTC

[impala] 04/14: IMPALA-7207: make Coordinator::exec_state_ an atomic enum

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ac4219563b5c0a454946f41b3f0c0e54d4e454bf
Author: Dan Hecht <dh...@cloudera.com>
AuthorDate: Mon Jun 25 10:47:54 2018 -0700

    IMPALA-7207: make Coordinator::exec_state_ an atomic enum
    
    That allows us to avoid taking the lock in cases where only
    the exec_state_ field needs to be read (as opposed to needing
    to read both exec_state_ and exec_status_). In particular,
    it avoids the lock on the non-terminating paths, which is
    the common case.
    
    Change-Id: Ie6c5d5c6ccfdfd533cd0607aab6f554e664b90ac
    Reviewed-on: http://gerrit.cloudera.org:8080/10811
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/atomic.h                   |  3 ++-
 be/src/runtime/coordinator.cc            | 21 +++++++--------------
 be/src/runtime/coordinator.h             | 12 ++++++++----
 be/src/runtime/fragment-instance-state.h |  3 ++-
 4 files changed, 19 insertions(+), 20 deletions(-)

diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 4c72826..5aa5f86 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -153,11 +153,12 @@ class AtomicEnum {
       "Underlying enum type must fit into 4 bytes");
 
  public:
+  AtomicEnum(T initial) : enum_(static_cast<int32_t>(initial)) {}
   /// Atomic load with "acquire" memory-ordering semantic.
   ALWAYS_INLINE T Load() const { return static_cast<T>(enum_.Load()); }
 
   /// Atomic store with "release" memory-ordering semantic.
-  ALWAYS_INLINE void Store(T val) { enum_.Store(val); }
+  ALWAYS_INLINE void Store(T val) { enum_.Store(static_cast<int32_t>(val)); }
 
  private:
   internal::AtomicInt<int32_t> enum_;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 22930a4..3489312 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -76,7 +76,7 @@ Coordinator::Coordinator(
 
 Coordinator::~Coordinator() {
   // Must have entered a terminal exec state guaranteeing resources were released.
-  DCHECK_NE(exec_state_, ExecState::EXECUTING);
+  DCHECK_NE(exec_state_.Load(), ExecState::EXECUTING);
   DCHECK_LE(backend_exec_complete_barrier_->pending(), 0);
   // Release the coordinator's reference to the query control structures.
   if (query_state_ != nullptr) {
@@ -450,9 +450,9 @@ Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
   {
     lock_guard<SpinLock> l(exec_state_lock_);
     // May have already entered a terminal state, in which case nothing to do.
-    if (exec_state_ != ExecState::EXECUTING) return exec_status_;
+    if (exec_state_.Load() != ExecState::EXECUTING) return exec_status_;
     DCHECK(exec_status_.ok()) << exec_status_;
-    exec_state_ = state;
+    exec_state_.Store(state);
     if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
     ret_status = exec_status_;
   }
@@ -468,13 +468,13 @@ Status Coordinator::UpdateExecState(const Status& status,
   ExecState old_state, new_state;
   {
     lock_guard<SpinLock> l(exec_state_lock_);
-    old_state = exec_state_;
+    old_state = exec_state_.Load();
     if (old_state == ExecState::EXECUTING) {
       DCHECK(exec_status_.ok()) << exec_status_;
       if (!status.ok()) {
         // Error while executing - go to ERROR state.
         exec_status_ = status;
-        exec_state_ = ExecState::ERROR;
+        exec_state_.Store(ExecState::ERROR);
       }
     } else if (old_state == ExecState::RETURNED_RESULTS) {
       // Already returned all results. Leave exec status as ok, stay in this state.
@@ -492,7 +492,7 @@ Status Coordinator::UpdateExecState(const Status& status,
         exec_status_ = status;
       }
     }
-    new_state = exec_state_;
+    new_state = exec_state_.Load();
     ret_status = exec_status_;
   }
   // Log interesting status: a non-cancelled error or a cancellation if was executing.
@@ -508,11 +508,6 @@ Status Coordinator::UpdateExecState(const Status& status,
   return ret_status;
 }
 
-bool Coordinator::ReturnedAllResults() {
-  lock_guard<SpinLock> l(exec_state_lock_);
-  return exec_state_ == ExecState::RETURNED_RESULTS;
-}
-
 void Coordinator::HandleExecStateTransition(
     const ExecState old_state, const ExecState new_state) {
   static const unordered_map<ExecState, const char *> exec_state_to_event{
@@ -623,9 +618,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  // exec_state_lock_ not needed here though since this path won't execute concurrently
-  // with itself or DML finalization.
-  if (exec_state_ == ExecState::RETURNED_RESULTS) {
+  if (ReturnedAllResults()) {
     // Nothing left to do: already in a terminal state and no more results.
     *eos = true;
     return Status::OK();
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 5bb399f..a0dce35 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -29,7 +29,6 @@
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/dml-exec-state.h"
-#include "util/condition-variable.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
@@ -276,7 +275,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// cancellation. Initialized in StartBackendExec().
   boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
 
-  SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
+  // Protects exec_state_ and exec_status_. exec_state_ can be read independently via
+  // the atomic, but the lock is held when writing either field and when reading both
+  // fields together.
+  SpinLock exec_state_lock_;
 
   /// EXECUTING: in-flight; the only non-terminal state
   /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete
@@ -285,7 +287,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   enum class ExecState {
     EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
   };
-  ExecState exec_state_ = ExecState::EXECUTING;
+  AtomicEnum<ExecState> exec_state_{ExecState::EXECUTING};
 
   /// Overall execution status; only set on exec_state_ transitions:
   /// - EXECUTING: OK
@@ -357,7 +359,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Return true if 'exec_state_' is RETURNED_RESULTS.
   /// TODO: remove with IMPALA-6984.
-  bool ReturnedAllResults() WARN_UNUSED_RESULT;
+  bool ReturnedAllResults() WARN_UNUSED_RESULT {
+    return exec_state_.Load() == ExecState::RETURNED_RESULTS;
+  }
 
   /// Return the string representation of 'state'.
   static const char* ExecStateToString(const ExecState state);
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 8295c8f..df27f9c 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -204,7 +204,8 @@ class FragmentInstanceState {
 
   /// The current state of this fragment instance's execution. Only updated by the
   /// fragment instance thread in UpdateState() and read by the profile reporting threads.
-  AtomicEnum<TFInstanceExecState::type> current_state_;
+  AtomicEnum<TFInstanceExecState::type> current_state_{
+    TFInstanceExecState::WAITING_FOR_EXEC};
 
   /// Output sink for rows sent to this fragment. Created in Prepare(), lives in
   /// obj_pool().