You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/09 21:48:09 UTC
[3/9] impala git commit: IMPALA-6153: Execute UpdateFilter() only for
executing queries
IMPALA-6153: Execute UpdateFilter() only for executing queries
This change ensures that coordinator::UpdateFilter is executed
only for queries which are in the EXECUTING state. Additionally,
it also guarantees that coordinator::ReleaseExecResources is
executed only when no other thread is executing UpdateFilter.
Testing: Ran all back-end tests.
Additionally, ran the exhaustive tests with DCHECKs which would
be triggered if the ReleaseExecResources() is executed while any
other thread is executing UpdateFilter().
Change-Id: I669db217f86db5ff2802335e7b1ae8027ea7161c
Reviewed-on: http://gerrit.cloudera.org:8080/11005
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2ced56c0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2ced56c0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2ced56c0
Branch: refs/heads/master
Commit: 2ced56c00b73ecdc92f1021f5d0129a41553e4f2
Parents: b4d20ab
Author: poojanilangekar <po...@cloudera.com>
Authored: Mon Jul 16 11:02:39 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 9 01:04:41 2018 +0000
----------------------------------------------------------------------
be/src/runtime/coordinator.cc | 43 +++++++++++++++++++++++++-------------
be/src/runtime/coordinator.h | 18 ++++++++++++++--
2 files changed, 45 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/2ced56c0/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4b87b69..8a3213e 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -76,7 +76,7 @@ Coordinator::Coordinator(
Coordinator::~Coordinator() {
// Must have entered a terminal exec state guaranteeing resources were released.
- DCHECK_NE(exec_state_.Load(), ExecState::EXECUTING);
+ DCHECK(!IsExecuting());
DCHECK_LE(backend_exec_complete_barrier_->pending(), 0);
// Release the coordinator's reference to the query control structures.
if (query_state_ != nullptr) {
@@ -261,6 +261,7 @@ void Coordinator::InitFilterRoutingTable() {
DCHECK(!filter_routing_table_complete_)
<< "InitFilterRoutingTable() called after setting filter_routing_table_complete_";
+ lock_guard<shared_mutex> lock(filter_lock_);
for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
int num_instances = fragment_params.instance_exec_params.size();
DCHECK_GT(num_instances, 0);
@@ -384,7 +385,6 @@ string Coordinator::FilterDebugString() {
table_printer.AddColumn("Tgt. Node(s)", false);
table_printer.AddColumn("Target type", false);
table_printer.AddColumn("Partition filter", false);
-
// Distribution metrics are only meaningful if the coordinator is routing the filter.
if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
table_printer.AddColumn("Pending (Expected)", false);
@@ -392,7 +392,6 @@ string Coordinator::FilterDebugString() {
table_printer.AddColumn("Completed", false);
}
table_printer.AddColumn("Enabled", false);
- lock_guard<SpinLock> l(filter_lock_);
for (FilterRoutingTable::value_type& v: filter_routing_table_) {
vector<string> row;
const FilterState& state = v.second;
@@ -449,7 +448,7 @@ Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
{
lock_guard<SpinLock> l(exec_state_lock_);
// May have already entered a terminal state, in which case nothing to do.
- if (exec_state_.Load() != ExecState::EXECUTING) return exec_status_;
+ if (!IsExecuting()) return exec_status_;
DCHECK(exec_status_.ok()) << exec_status_;
exec_state_.Store(state);
if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
@@ -709,7 +708,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
}
// If query execution has terminated, return a cancelled status to force the fragment
// instance to stop executing.
- return exec_state_.Load() == ExecState::EXECUTING ? Status::OK() : Status::CANCELLED;
+ return IsExecuting() ? Status::OK() : Status::CANCELLED;
}
// TODO: add histogram/percentile
@@ -750,16 +749,14 @@ string Coordinator::GetErrorLog() {
}
void Coordinator::ReleaseExecResources() {
+ lock_guard<shared_mutex> lock(filter_lock_);
if (filter_routing_table_.size() > 0) {
query_profile_->AddInfoString("Final filter table", FilterDebugString());
}
- {
- lock_guard<SpinLock> l(filter_lock_);
- for (auto& filter : filter_routing_table_) {
- FilterState* state = &filter.second;
- state->Disable(filter_mem_tracker_);
- }
+ for (auto& filter : filter_routing_table_) {
+ FilterState* state = &filter.second;
+ state->Disable(filter_mem_tracker_);
}
// This may be NULL while executing UDFs.
if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
@@ -779,6 +776,7 @@ void Coordinator::ReleaseAdmissionControlResources() {
}
void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
+ shared_lock<shared_mutex> lock(filter_lock_);
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
<< "UpdateFilter() called although runtime filters are disabled";
DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
@@ -791,7 +789,12 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
TPublishFilterParams rpc_params;
unordered_set<int> target_fragment_idxs;
{
- lock_guard<SpinLock> l(filter_lock_);
+ lock_guard<SpinLock> l(filter_update_lock_);
+ if (!IsExecuting()) {
+ LOG(INFO) << "Filter update received for non-executing query with id: "
+ << query_id();
+ return;
+ }
FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id);
if (it == filter_routing_table_.end()) {
LOG(INFO) << "Could not find filter with id: " << params.filter_id;
@@ -834,9 +837,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
if (state->is_bloom_filter()) {
// Assign outgoing bloom filter.
TBloomFilter& aggregated_filter = state->bloom_filter();
- filter_mem_tracker_->Release(aggregated_filter.directory.size());
- // TODO: Track memory used by 'rpc_params'.
swap(rpc_params.bloom_filter, aggregated_filter);
DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
|| !rpc_params.bloom_filter.directory.empty());
@@ -858,10 +859,19 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
// Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
for (BackendState* bs: backend_states_) {
for (int fragment_idx: target_fragment_idxs) {
+ if (!IsExecuting()) goto cleanup;
rpc_params.__set_dst_fragment_idx(fragment_idx);
bs->PublishFilter(rpc_params);
}
}
+
+cleanup:
+ // For bloom filters, the memory used in the filter_routing_table_ is transfered to
+ // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
+ // here to ensure that the MemTracker is updated after the memory is actually freed.
+ if (rpc_params.__isset.bloom_filter) {
+ filter_mem_tracker_->Release(rpc_params.bloom_filter.directory.size());
+ }
}
void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
@@ -976,3 +986,8 @@ const TFinalizeParams* Coordinator::finalize_params() const {
return schedule_.request().__isset.finalize_params
? &schedule_.request().finalize_params : nullptr;
}
+
+bool Coordinator::IsExecuting() {
+ ExecState current_state = exec_state_.Load();
+ return current_state == ExecState::EXECUTING;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/2ced56c0/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index a0dce35..0b0312c 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -21,6 +21,7 @@
#include <string>
#include <vector>
#include <boost/scoped_ptr.hpp>
+#include <boost/thread/shared_mutex.hpp>
#include <boost/unordered_map.hpp>
#include <rapidjson/document.h>
@@ -89,7 +90,8 @@ class QueryState;
///
/// Lock ordering: (lower-numbered acquired before higher-numbered)
/// 1. wait_lock_
-/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
+/// 2. filter_lock_
+/// 3. exec_state_lock_, backend_states_init_lock_, filter_update_lock_,
/// ExecSummary::lock (leafs)
///
/// TODO: move into separate subdirectory and move nested classes into separate files
@@ -296,8 +298,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// - ERROR: error status
Status exec_status_;
+ /// Synchronizes updates to the filter_routing_table_.
+ SpinLock filter_update_lock_;
+
/// Protects filter_routing_table_.
- SpinLock filter_lock_;
+ /// Usage pattern:
+ /// 1. To update filter_routing_table_: Acquire shared access on filter_lock_ and
+ /// upgrade to exclusive access by subsequently acquiring filter_update_lock_.
+ /// 2. To read, initialize/destroy filter_routing_table: Directly acquire exclusive
+ /// access on filter_lock_.
+ boost::shared_mutex filter_lock_;
/// Map from filter ID to filter.
typedef boost::unordered_map<int32_t, FilterState> FilterRoutingTable;
@@ -319,6 +329,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
const TUniqueId& query_id() const;
/// Returns a pretty-printed table of the current filter state.
+ /// Caller must have exclusive access to filter_lock_.
std::string FilterDebugString();
/// Called when the query is done executing due to reaching EOS or client
@@ -445,6 +456,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
///
/// The ExecState state-machine ensures this is called exactly once.
void ReleaseAdmissionControlResources();
+
+ /// Checks the exec_state_ of the query and returns true if the query is executing.
+ bool IsExecuting();
};
}