You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/25 07:57:18 UTC
[impala] 04/14: IMPALA-7207: make Coordinator::exec_state_ an
atomic enum
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git
commit ac4219563b5c0a454946f41b3f0c0e54d4e454bf
Author: Dan Hecht <dh...@cloudera.com>
AuthorDate: Mon Jun 25 10:47:54 2018 -0700
IMPALA-7207: make Coordinator::exec_state_ an atomic enum
That allows us to avoid taking the lock in cases where only
the exec_state_ field needs to be read (as opposed to needing
to read both exec_state_ and exec_status_). In particular,
it avoids the lock on the non-terminating paths, which is
the common case.
Change-Id: Ie6c5d5c6ccfdfd533cd0607aab6f554e664b90ac
Reviewed-on: http://gerrit.cloudera.org:8080/10811
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/common/atomic.h | 3 ++-
be/src/runtime/coordinator.cc | 21 +++++++--------------
be/src/runtime/coordinator.h | 12 ++++++++----
be/src/runtime/fragment-instance-state.h | 3 ++-
4 files changed, 19 insertions(+), 20 deletions(-)
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 4c72826..5aa5f86 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -153,11 +153,12 @@ class AtomicEnum {
"Underlying enum type must fit into 4 bytes");
public:
+ AtomicEnum(T initial) : enum_(static_cast<int32_t>(initial)) {}
/// Atomic load with "acquire" memory-ordering semantic.
ALWAYS_INLINE T Load() const { return static_cast<T>(enum_.Load()); }
/// Atomic store with "release" memory-ordering semantic.
- ALWAYS_INLINE void Store(T val) { enum_.Store(val); }
+ ALWAYS_INLINE void Store(T val) { enum_.Store(static_cast<int32_t>(val)); }
private:
internal::AtomicInt<int32_t> enum_;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 22930a4..3489312 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -76,7 +76,7 @@ Coordinator::Coordinator(
Coordinator::~Coordinator() {
// Must have entered a terminal exec state guaranteeing resources were released.
- DCHECK_NE(exec_state_, ExecState::EXECUTING);
+ DCHECK_NE(exec_state_.Load(), ExecState::EXECUTING);
DCHECK_LE(backend_exec_complete_barrier_->pending(), 0);
// Release the coordinator's reference to the query control structures.
if (query_state_ != nullptr) {
@@ -450,9 +450,9 @@ Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
{
lock_guard<SpinLock> l(exec_state_lock_);
// May have already entered a terminal state, in which case nothing to do.
- if (exec_state_ != ExecState::EXECUTING) return exec_status_;
+ if (exec_state_.Load() != ExecState::EXECUTING) return exec_status_;
DCHECK(exec_status_.ok()) << exec_status_;
- exec_state_ = state;
+ exec_state_.Store(state);
if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
ret_status = exec_status_;
}
@@ -468,13 +468,13 @@ Status Coordinator::UpdateExecState(const Status& status,
ExecState old_state, new_state;
{
lock_guard<SpinLock> l(exec_state_lock_);
- old_state = exec_state_;
+ old_state = exec_state_.Load();
if (old_state == ExecState::EXECUTING) {
DCHECK(exec_status_.ok()) << exec_status_;
if (!status.ok()) {
// Error while executing - go to ERROR state.
exec_status_ = status;
- exec_state_ = ExecState::ERROR;
+ exec_state_.Store(ExecState::ERROR);
}
} else if (old_state == ExecState::RETURNED_RESULTS) {
// Already returned all results. Leave exec status as ok, stay in this state.
@@ -492,7 +492,7 @@ Status Coordinator::UpdateExecState(const Status& status,
exec_status_ = status;
}
}
- new_state = exec_state_;
+ new_state = exec_state_.Load();
ret_status = exec_status_;
}
// Log interesting status: a non-cancelled error or a cancellation if was executing.
@@ -508,11 +508,6 @@ Status Coordinator::UpdateExecState(const Status& status,
return ret_status;
}
-bool Coordinator::ReturnedAllResults() {
- lock_guard<SpinLock> l(exec_state_lock_);
- return exec_state_ == ExecState::RETURNED_RESULTS;
-}
-
void Coordinator::HandleExecStateTransition(
const ExecState old_state, const ExecState new_state) {
static const unordered_map<ExecState, const char *> exec_state_to_event{
@@ -623,9 +618,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
DCHECK(has_called_wait_);
SCOPED_TIMER(query_profile_->total_time_counter());
- // exec_state_lock_ not needed here though since this path won't execute concurrently
- // with itself or DML finalization.
- if (exec_state_ == ExecState::RETURNED_RESULTS) {
+ if (ReturnedAllResults()) {
// Nothing left to do: already in a terminal state and no more results.
*eos = true;
return Status::OK();
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 5bb399f..a0dce35 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -29,7 +29,6 @@
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Types_types.h"
#include "runtime/dml-exec-state.h"
-#include "util/condition-variable.h"
#include "util/progress-updater.h"
#include "util/runtime-profile-counters.h"
#include "util/spinlock.h"
@@ -276,7 +275,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// cancellation. Initialized in StartBackendExec().
boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
- SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
+ // Protects exec_state_ and exec_status_. exec_state_ can be read independently via
+ // the atomic, but the lock is held when writing either field and when reading both
+ // fields together.
+ SpinLock exec_state_lock_;
/// EXECUTING: in-flight; the only non-terminal state
/// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete
@@ -285,7 +287,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
enum class ExecState {
EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
};
- ExecState exec_state_ = ExecState::EXECUTING;
+ AtomicEnum<ExecState> exec_state_{ExecState::EXECUTING};
/// Overall execution status; only set on exec_state_ transitions:
/// - EXECUTING: OK
@@ -357,7 +359,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Return true if 'exec_state_' is RETURNED_RESULTS.
/// TODO: remove with IMPALA-6984.
- bool ReturnedAllResults() WARN_UNUSED_RESULT;
+ bool ReturnedAllResults() WARN_UNUSED_RESULT {
+ return exec_state_.Load() == ExecState::RETURNED_RESULTS;
+ }
/// Return the string representation of 'state'.
static const char* ExecStateToString(const ExecState state);
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 8295c8f..df27f9c 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -204,7 +204,8 @@ class FragmentInstanceState {
/// The current state of this fragment instance's execution. Only updated by the
/// fragment instance thread in UpdateState() and read by the profile reporting threads.
- AtomicEnum<TFInstanceExecState::type> current_state_;
+ AtomicEnum<TFInstanceExecState::type> current_state_{
+ TFInstanceExecState::WAITING_FOR_EXEC};
/// Output sink for rows sent to this fragment. Created in Prepare(), lives in
/// obj_pool().