You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/09 21:48:09 UTC

[3/9] impala git commit: IMPALA-6153: Execute UpdateFilter() only for executing queries

IMPALA-6153: Execute UpdateFilter() only for executing queries

This change ensures that coordinator::UpdateFilter is executed
only for queries which are in the EXECUTING state. Additionally,
it also guarantees that coordinator::ReleaseExecResources is
executed only when no other thread is executing UpdateFilter.

Testing: Ran all back-end tests.
Additionally, ran the exhaustive tests with DCHECKs which would
be triggered if the ReleaseExecResources() is executed while any
other thread is executing UpdateFilter().

Change-Id: I669db217f86db5ff2802335e7b1ae8027ea7161c
Reviewed-on: http://gerrit.cloudera.org:8080/11005
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2ced56c0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2ced56c0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2ced56c0

Branch: refs/heads/master
Commit: 2ced56c00b73ecdc92f1021f5d0129a41553e4f2
Parents: b4d20ab
Author: poojanilangekar <po...@cloudera.com>
Authored: Mon Jul 16 11:02:39 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 9 01:04:41 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc | 43 +++++++++++++++++++++++++-------------
 be/src/runtime/coordinator.h  | 18 ++++++++++++++--
 2 files changed, 45 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2ced56c0/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4b87b69..8a3213e 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -76,7 +76,7 @@ Coordinator::Coordinator(
 
 Coordinator::~Coordinator() {
   // Must have entered a terminal exec state guaranteeing resources were released.
-  DCHECK_NE(exec_state_.Load(), ExecState::EXECUTING);
+  DCHECK(!IsExecuting());
   DCHECK_LE(backend_exec_complete_barrier_->pending(), 0);
   // Release the coordinator's reference to the query control structures.
   if (query_state_ != nullptr) {
@@ -261,6 +261,7 @@ void Coordinator::InitFilterRoutingTable() {
   DCHECK(!filter_routing_table_complete_)
       << "InitFilterRoutingTable() called after setting filter_routing_table_complete_";
 
+  lock_guard<shared_mutex> lock(filter_lock_);
   for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
     int num_instances = fragment_params.instance_exec_params.size();
     DCHECK_GT(num_instances, 0);
@@ -384,7 +385,6 @@ string Coordinator::FilterDebugString() {
   table_printer.AddColumn("Tgt. Node(s)", false);
   table_printer.AddColumn("Target type", false);
   table_printer.AddColumn("Partition filter", false);
-
   // Distribution metrics are only meaningful if the coordinator is routing the filter.
   if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
     table_printer.AddColumn("Pending (Expected)", false);
@@ -392,7 +392,6 @@ string Coordinator::FilterDebugString() {
     table_printer.AddColumn("Completed", false);
   }
   table_printer.AddColumn("Enabled", false);
-  lock_guard<SpinLock> l(filter_lock_);
   for (FilterRoutingTable::value_type& v: filter_routing_table_) {
     vector<string> row;
     const FilterState& state = v.second;
@@ -449,7 +448,7 @@ Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
   {
     lock_guard<SpinLock> l(exec_state_lock_);
     // May have already entered a terminal state, in which case nothing to do.
-    if (exec_state_.Load() != ExecState::EXECUTING) return exec_status_;
+    if (!IsExecuting()) return exec_status_;
     DCHECK(exec_status_.ok()) << exec_status_;
     exec_state_.Store(state);
     if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
@@ -709,7 +708,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   }
   // If query execution has terminated, return a cancelled status to force the fragment
   // instance to stop executing.
-  return exec_state_.Load() == ExecState::EXECUTING ? Status::OK() : Status::CANCELLED;
+  return IsExecuting() ? Status::OK() : Status::CANCELLED;
 }
 
 // TODO: add histogram/percentile
@@ -750,16 +749,14 @@ string Coordinator::GetErrorLog() {
 }
 
 void Coordinator::ReleaseExecResources() {
+  lock_guard<shared_mutex> lock(filter_lock_);
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
 
-  {
-    lock_guard<SpinLock> l(filter_lock_);
-    for (auto& filter : filter_routing_table_) {
-      FilterState* state = &filter.second;
-      state->Disable(filter_mem_tracker_);
-    }
+  for (auto& filter : filter_routing_table_) {
+    FilterState* state = &filter.second;
+    state->Disable(filter_mem_tracker_);
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
@@ -779,6 +776,7 @@ void Coordinator::ReleaseAdmissionControlResources() {
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
+  shared_lock<shared_mutex> lock(filter_lock_);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
   DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
@@ -791,7 +789,12 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   TPublishFilterParams rpc_params;
   unordered_set<int> target_fragment_idxs;
   {
-    lock_guard<SpinLock> l(filter_lock_);
+    lock_guard<SpinLock> l(filter_update_lock_);
+    if (!IsExecuting()) {
+      LOG(INFO) << "Filter update received for non-executing query with id: "
+                << query_id();
+      return;
+    }
     FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id);
     if (it == filter_routing_table_.end()) {
       LOG(INFO) << "Could not find filter with id: " << params.filter_id;
@@ -834,9 +837,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     if (state->is_bloom_filter()) {
       // Assign outgoing bloom filter.
       TBloomFilter& aggregated_filter = state->bloom_filter();
-      filter_mem_tracker_->Release(aggregated_filter.directory.size());
 
-      // TODO: Track memory used by 'rpc_params'.
       swap(rpc_params.bloom_filter, aggregated_filter);
       DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
           || !rpc_params.bloom_filter.directory.empty());
@@ -858,10 +859,19 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
+      if (!IsExecuting()) goto cleanup;
       rpc_params.__set_dst_fragment_idx(fragment_idx);
       bs->PublishFilter(rpc_params);
     }
   }
+
+cleanup:
+  // For bloom filters, the memory used in the filter_routing_table_ is transfered to
+  // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
+  // here to ensure that the MemTracker is updated after the memory is actually freed.
+  if (rpc_params.__isset.bloom_filter) {
+    filter_mem_tracker_->Release(rpc_params.bloom_filter.directory.size());
+  }
 }
 
 void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
@@ -976,3 +986,8 @@ const TFinalizeParams* Coordinator::finalize_params() const {
   return schedule_.request().__isset.finalize_params
       ? &schedule_.request().finalize_params : nullptr;
 }
+
+bool Coordinator::IsExecuting() {
+  ExecState current_state = exec_state_.Load();
+  return current_state == ExecState::EXECUTING;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/2ced56c0/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index a0dce35..0b0312c 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <vector>
 #include <boost/scoped_ptr.hpp>
+#include <boost/thread/shared_mutex.hpp>
 #include <boost/unordered_map.hpp>
 #include <rapidjson/document.h>
 
@@ -89,7 +90,8 @@ class QueryState;
 ///
 /// Lock ordering: (lower-numbered acquired before higher-numbered)
 /// 1. wait_lock_
-/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
+/// 2. filter_lock_
+/// 3. exec_state_lock_, backend_states_init_lock_, filter_update_lock_,
 ///    ExecSummary::lock (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files
@@ -296,8 +298,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// - ERROR: error status
   Status exec_status_;
 
+  /// Synchronizes updates to the filter_routing_table_.
+  SpinLock filter_update_lock_;
+
   /// Protects filter_routing_table_.
-  SpinLock filter_lock_;
+  /// Usage pattern:
+  /// 1. To update filter_routing_table_: Acquire shared access on filter_lock_ and
+  ///    upgrade to exclusive access by subsequently acquiring filter_update_lock_.
+  /// 2. To read, initialize/destroy filter_routing_table: Directly acquire exclusive
+  ///    access on filter_lock_.
+  boost::shared_mutex filter_lock_;
 
   /// Map from filter ID to filter.
   typedef boost::unordered_map<int32_t, FilterState> FilterRoutingTable;
@@ -319,6 +329,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   const TUniqueId& query_id() const;
 
   /// Returns a pretty-printed table of the current filter state.
+  /// Caller must have exclusive access to filter_lock_.
   std::string FilterDebugString();
 
   /// Called when the query is done executing due to reaching EOS or client
@@ -445,6 +456,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   ///
   /// The ExecState state-machine ensures this is called exactly once.
   void ReleaseAdmissionControlResources();
+
+  /// Checks the exec_state_ of the query and returns true if the query is executing.
+  bool IsExecuting();
 };
 
 }