You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/01/21 00:50:15 UTC

[impala] 03/03: IMPALA-9154: Make runtime filter propagation asynchronous

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 79aae231443a305ce8503dbc7b4335e8ae3f3946
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Wed Nov 27 14:06:34 2019 -0800

    IMPALA-9154: Make runtime filter propagation asynchronous
    
    This patch fixes a bug introduced by IMPALA-7984 that ports the
    functions implementing the aggregation and propagation of runtime
    filters from Thrift RPC to KRPC.
    
    Specifically, in IMPALA-7984, the propagation of an aggregated
    runtime filter was implemented using the synchronous KRPC. Hence, when
    there is a very limited number of KRPC threads for Impala's data stream
    service, e.g., 1, there will be a deadlock if the node running the
    Coordinator is trying to propagate the aggregated filter to the same
    node running the Coordinator since there is no available thread to
    receive the aggregated filter.
    
    This patch makes the propagation of an aggregated runtime filter
    asynchronous to address the issue described above. To prevent the
    memory consumed by the aggregated filter from being reclaimed when the
    aggregated filter is still referenced by some inflight KRPC's, we add an
    additional field in the class Coordinator::FilterState to keep track of
    the number of inflight KRPC's for the propagation of this aggregated
    filter to make sure that we will reclaim the memory only when all the
    associated KRPC's have completed. Moreover, when ReleaseExecResources()
    is invoked by the Coordinator to release all the resources associated
    with query execution, including the memory consumed by the aggregated
    runtime filters, we make sure the consumed memory by the aggregated
    filters is released only when the inflight KRPC's associated with each
    aggregated filter have finished.
    
    Testing:
    - Passed primitive_many_fragments.test with the database tpch30 in an
      Impala minicluster started with the parameter
      --impalad_args=--datastream_service_num_svc_threads=1.
    - Passed the exhaustive tests in the DEBUG build.
    - Passed the core tests in the ASAN build.
    
    Change-Id: Ifb6726d349be701f3a0602b2ad5a934082f188a0
    Reviewed-on: http://gerrit.cloudera.org:8080/14975
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc |  48 +++++++---
 be/src/runtime/coordinator-backend-state.h  |   7 +-
 be/src/runtime/coordinator-filter-state.h   |  37 ++++++--
 be/src/runtime/coordinator.cc               | 131 ++++++++++++++++------------
 be/src/runtime/coordinator.h                |   4 +-
 5 files changed, 151 insertions(+), 76 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 72e245e..20d7b6f 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -551,8 +551,9 @@ bool Coordinator::BackendState::Cancel() {
   return true;
 }
 
-void Coordinator::BackendState::PublishFilter(
-    const PublishFilterParamsPB& rpc_params, RpcController& controller) {
+void Coordinator::BackendState::PublishFilter(FilterState* state,
+    MemTracker* mem_tracker, const PublishFilterParamsPB& rpc_params,
+    RpcController& controller, PublishFilterResultPB& res) {
   DCHECK_EQ(ProtoToQueryId(rpc_params.dst_query_id()), query_id_);
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
@@ -571,15 +572,42 @@ void Coordinator::BackendState::PublishFilter(
     return;
   }
 
-  PublishFilterResultPB res;
-  kudu::Status rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
-  if (!rpc_status.ok()) {
-    LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
-    return;
+  state->IncrementNumInflightRpcs(1);
+
+  proxy->PublishFilterAsync(rpc_params, &res, &controller,
+      boost::bind(&Coordinator::BackendState::PublishFilterCompleteCb, this, &controller,
+                                state, mem_tracker));
+}
+
+void Coordinator::BackendState::PublishFilterCompleteCb(
+    const kudu::rpc::RpcController* rpc_controller, FilterState* state,
+    MemTracker* mem_tracker) {
+  const kudu::Status controller_status = rpc_controller->status();
+
+  // In the case of an unsuccessful KRPC call, we only log this event w/o retrying.
+  // Failing to send a filter is not a query-wide error - the remote fragment will
+  // continue regardless.
+  if (!controller_status.ok()) {
+    LOG(ERROR) << "PublishFilter() failed: " << controller_status.message().ToString();
   }
-  if (res.status().status_code() != TErrorCode::OK) {
-    LOG(ERROR) << "PublishFilter() operation failed: "
-               << Status(res.status()).GetDetail();
+
+  {
+    lock_guard<SpinLock> l(state->lock());
+
+    state->IncrementNumInflightRpcs(-1);
+
+    if (state->num_inflight_rpcs() == 0) {
+      // Since we disabled the filter once complete and held FilterState::lock_ while
+      // issuing all PublishFilter() rpcs, at this point there can't be any more
+      // PublishFilter() rpcs issued.
+      DCHECK(state->disabled());
+      if (state->is_bloom_filter() && state->bloom_filter_directory().size() > 0) {
+        mem_tracker->Release(state->bloom_filter_directory().size());
+        state->bloom_filter_directory().clear();
+        state->bloom_filter_directory().shrink_to_fit();
+      }
+      state->get_publish_filter_done_cv().notify_one();
+    }
   }
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index fd80b86..432857a 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -106,8 +106,11 @@ class Coordinator::BackendState {
 
   /// Make a PublishFilter rpc with given params if this backend has instances of the
   /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
-  void PublishFilter(
-      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller);
+  void PublishFilter(FilterState* state, MemTracker* mem_tracker,
+      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller,
+      PublishFilterResultPB& res);
+  void PublishFilterCompleteCb(const kudu::rpc::RpcController* rpc_controller,
+      FilterState* state, MemTracker* mem_tracker);
 
   /// Cancel execution at this backend if anything is running. Returns true
   /// if cancellation was attempted, false otherwise.
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 55c6b3d..7e92883 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
+#include <condition_variable>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -56,6 +56,9 @@ struct Coordinator::FilterTarget {
 /// A filter is disabled if an always_true filter update is received, an OOM is hit,
 /// filter aggregation is complete or if the query is complete.
 /// Once a filter is disabled, subsequent updates for that filter are ignored.
+///
+/// This class is not thread safe. Callers must always take 'lock()' themselves when
+/// calling any FilterState functions if thread safety is needed.
 class Coordinator::FilterState {
  public:
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
@@ -88,14 +91,30 @@ class Coordinator::FilterState {
       return min_max_filter_.always_true();
     }
   }
+  int num_inflight_rpcs() const { return num_inflight_publish_filter_rpcs_; }
+  SpinLock& lock() { return lock_; }
+  std::condition_variable_any& get_publish_filter_done_cv() {
+    return publish_filter_done_cv_;
+  }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
   void ApplyUpdate(const UpdateFilterParamsPB& params, Coordinator* coord,
       kudu::rpc::RpcContext* context);
 
-  /// Disables a filter. A disabled filter consumes no memory.
-  void Disable(MemTracker* tracker);
+  /// Disables the filter and releases the consumed memory if the filter is a Bloom
+  /// filter.
+  void DisableAndRelease(MemTracker* tracker);
+  /// Disables the filter but does not release the consumed memory.
+  void Disable();
+
+  void IncrementNumInflightRpcs(int i) {
+    num_inflight_publish_filter_rpcs_ += i;
+    DCHECK_GE(num_inflight_publish_filter_rpcs_, 0);
+  }
+
+  /// Waits until any inflight PublishFilter rpcs have completed.
+  void WaitForPublishFilter();
 
  private:
   /// Contains the specification of the runtime filter.
@@ -129,8 +148,15 @@ class Coordinator::FilterState {
   /// Time at which all local filters arrived.
   int64_t completion_time_ = 0L;
 
-  /// TODO: Add a per-object lock so that we can avoid holding the global routing table
+  /// Per-object lock so that we can avoid holding the global routing table
   /// lock for every filter update.
+  SpinLock lock_;
+
+  /// Keeps track of the number of inflight PublishFilter rpcs.
+  int num_inflight_publish_filter_rpcs_ = 0;
+
+  /// Signaled when 'num_inflight_rpcs' reaches 0.
+  std::condition_variable_any publish_filter_done_cv_;
 };
 
 /// Struct to contain all of the data structures for filter routing. Coordinator
@@ -146,9 +172,6 @@ struct Coordinator::FilterRoutingTable {
   // The value is source plan node id and the filter ID.
   boost::unordered_map<int, std::vector<TRuntimeFilterSource>> finstance_filters_produced;
 
-  /// Synchronizes updates to the state of this routing table.
-  SpinLock update_lock;
-
   /// Protects this routing table.
   /// Usage pattern:
   /// 1. To update the routing table: Acquire shared access on 'lock' and
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 48d81f9..9458578 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -348,10 +348,13 @@ void Coordinator::InitFilterRoutingTable() {
       if (!plan_node.__isset.runtime_filters) continue;
       for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
         DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets);
-        auto i = filter_routing_table_->id_to_filter.emplace(
-            filter.filter_id, FilterState(filter, plan_node.node_id)).first;
-        FilterState* f = &(i->second);
+        auto i = filter_routing_table_->id_to_filter
+                     .emplace(std::piecewise_construct,
+                         std::forward_as_tuple(filter.filter_id),
+                         std::forward_as_tuple(filter, plan_node.node_id))
+                     .first;
 
+        FilterState* f = &(i->second);
         // source plan node of filter
         if (plan_node.__isset.hash_join_node) {
           // Set the 'pending_count_' to zero to indicate that for a filter with
@@ -1045,9 +1048,11 @@ void Coordinator::ReleaseExecResources() {
   }
 
   for (auto& filter : filter_routing_table_->id_to_filter) {
-    FilterState* state = &filter.second;
-    state->Disable(filter_mem_tracker_);
+    unique_lock<SpinLock> l(filter.second.lock());
+    filter.second.WaitForPublishFilter();
+    filter.second.DisableAndRelease(filter_mem_tracker_);
   }
+
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
   // At this point some tracked memory may still be used in the coordinator for result
@@ -1124,21 +1129,22 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
 
   PublishFilterParamsPB rpc_params;
   unordered_set<int> target_fragment_idxs;
-  string bloom_filter_directory;
+  if (!IsExecuting()) {
+    LOG(INFO) << "Filter update received for non-executing query with id: "
+        << query_id();
+    return;
+  }
+  auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
+  if (it == filter_routing_table_.get()->id_to_filter.end()) {
+    // This should not be possible since 'id_to_filter' is never changed after
+    // InitFilterRoutingTable().
+    DCHECK(false);
+    LOG(INFO) << "Could not find filter with id: " << rpc_params.filter_id();
+    return;
+  }
+  FilterState* state = &it->second;
   {
-    lock_guard<SpinLock> l(filter_routing_table_->update_lock);
-    if (!IsExecuting()) {
-      LOG(INFO) << "Filter update received for non-executing query with id: "
-                << query_id();
-      return;
-    }
-    auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
-    if (it == filter_routing_table_->id_to_filter.end()) {
-      LOG(INFO) << "Could not find filter with id: " << params.filter_id();
-      return;
-    }
-    FilterState* state = &it->second;
-
+    lock_guard<SpinLock> l(state->lock());
     DCHECK(state->desc().has_remote_targets)
         << "Coordinator received filter that has only local targets";
 
@@ -1165,7 +1171,7 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
 
     // No more updates are pending on this filter ID. Create a distribution payload and
     // offer it to the queue.
-    for (const FilterTarget& target: *state->targets()) {
+    for (const FilterTarget& target : *state->targets()) {
       // Don't publish the filter to targets that are in the same fragment as the join
       // that produced it.
       if (target.is_local) continue;
@@ -1175,44 +1181,47 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
     if (state->is_bloom_filter()) {
       // Assign an outgoing bloom filter.
       *rpc_params.mutable_bloom_filter() = state->bloom_filter();
-      bloom_filter_directory.swap(state->bloom_filter_directory());
+
       DCHECK(rpc_params.bloom_filter().always_false()
-          || rpc_params.bloom_filter().always_true() || !bloom_filter_directory.empty());
+          || rpc_params.bloom_filter().always_true()
+          || !state->bloom_filter_directory().empty());
+
     } else {
       DCHECK(state->is_min_max_filter());
       MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
     }
 
-    // Filter is complete, and can be released.
-    state->Disable(filter_mem_tracker_);
-  }
+    // Filter is complete. We disable it so future UpdateFilter rpcs will be ignored,
+    // e.g., if it was a broadcast join.
+    state->Disable();
+
+    TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
+    rpc_params.set_filter_id(params.filter_id());
+
+    // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
+    for (BackendState* bs : backend_states_) {
+      for (int fragment_idx : target_fragment_idxs) {
+        if (!IsExecuting()) {
+          if (rpc_params.has_bloom_filter()) {
+            filter_mem_tracker_->Release(state->bloom_filter_directory().size());
+            state->bloom_filter_directory().clear();
+            state->bloom_filter_directory().shrink_to_fit();
+          }
+          return;
+        }
 
-  TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
-  rpc_params.set_filter_id(params.filter_id());
-
-  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
-  for (BackendState* bs: backend_states_) {
-    for (int fragment_idx: target_fragment_idxs) {
-      if (!IsExecuting()) goto cleanup;
-      rpc_params.set_dst_fragment_idx(fragment_idx);
-      RpcController controller;
-      if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
-          && !rpc_params.bloom_filter().always_true()) {
-        BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), &controller,
-            bloom_filter_directory);
+        rpc_params.set_dst_fragment_idx(fragment_idx);
+        RpcController* controller = obj_pool()->Add(new RpcController);
+        PublishFilterResultPB* res = obj_pool()->Add(new PublishFilterResultPB);
+        if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
+            && !rpc_params.bloom_filter().always_true()) {
+          BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), controller,
+              state->bloom_filter_directory());
+        }
+        bs->PublishFilter(state, filter_mem_tracker_, rpc_params, *controller, *res);
       }
-      // TODO: make this asynchronous.
-      bs->PublishFilter(rpc_params, controller);
     }
   }
-
-cleanup:
-  // For bloom filters, the memory used in the filter_routing_table_ is transfered to
-  // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
-  // here to ensure that the MemTracker is updated after the memory is actually freed.
-  if (rpc_params.has_bloom_filter()) {
-    filter_mem_tracker_->Release(bloom_filter_directory.size());
-  }
 }
 
 void Coordinator::FilterState::ApplyUpdate(
@@ -1228,7 +1237,7 @@ void Coordinator::FilterState::ApplyUpdate(
   if (is_bloom_filter()) {
     DCHECK(params.has_bloom_filter());
     if (params.bloom_filter().always_true()) {
-      Disable(coord->filter_mem_tracker_);
+      DisableAndRelease(coord->filter_mem_tracker_);
     } else if (params.bloom_filter().always_false()) {
       if (!bloom_filter_.has_log_bufferpool_space()) {
         bloom_filter_ = BloomFilterPB(params.bloom_filter());
@@ -1243,7 +1252,7 @@ void Coordinator::FilterState::ApplyUpdate(
           params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
       if (!status.ok()) {
         LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
-        Disable(coord->filter_mem_tracker_);
+        DisableAndRelease(coord->filter_mem_tracker_);
       } else if (bloom_filter_.always_false()) {
         int64_t heap_space = sidecar_slice.size();
         if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
@@ -1251,7 +1260,7 @@ void Coordinator::FilterState::ApplyUpdate(
                      << PrettyPrinter::Print(heap_space, TUnit::BYTES)
                      << " (query_id=" << PrintId(coord->query_id()) << ")";
           // Disable, as one missing update means a correct filter cannot be produced.
-          Disable(coord->filter_mem_tracker_);
+          DisableAndRelease(coord->filter_mem_tracker_);
         } else {
           bloom_filter_ = params.bloom_filter();
           bloom_filter_directory_ = sidecar_slice.ToString();
@@ -1267,7 +1276,7 @@ void Coordinator::FilterState::ApplyUpdate(
     DCHECK(is_min_max_filter());
     DCHECK(params.has_min_max_filter());
     if (params.min_max_filter().always_true()) {
-      Disable(coord->filter_mem_tracker_);
+      DisableAndRelease(coord->filter_mem_tracker_);
     } else if (min_max_filter_.always_false()) {
       MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
     } else {
@@ -1281,13 +1290,19 @@ void Coordinator::FilterState::ApplyUpdate(
   }
 }
 
-void Coordinator::FilterState::Disable(MemTracker* tracker) {
+void Coordinator::FilterState::DisableAndRelease(MemTracker* tracker) {
+  Disable();
   if (is_bloom_filter()) {
-    bloom_filter_.set_always_true(true);
-    bloom_filter_.set_always_false(false);
     tracker->Release(bloom_filter_directory_.size());
     bloom_filter_directory_.clear();
     bloom_filter_directory_.shrink_to_fit();
+  }
+}
+
+void Coordinator::FilterState::Disable() {
+  if (is_bloom_filter()) {
+    bloom_filter_.set_always_true(true);
+    bloom_filter_.set_always_false(false);
   } else {
     DCHECK(is_min_max_filter());
     min_max_filter_.set_always_true(true);
@@ -1295,6 +1310,12 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) {
   }
 }
 
+void Coordinator::FilterState::WaitForPublishFilter() {
+  while (num_inflight_publish_filter_rpcs_ > 0) {
+    publish_filter_done_cv_.wait(lock_);
+  }
+}
+
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 7b05fde..aa03f30 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -100,8 +100,8 @@ class TUpdateCatalogRequest;
 ///
 /// Lock ordering: (lower-numbered acquired before higher-numbered)
 /// 1. wait_lock_
-/// 2. filter_lock_
-/// 3. exec_state_lock_, backend_states_init_lock_, filter_update_lock_, ExecSummary::lock
+/// 2. Coordinator::FilterRoutingTable::lock
+/// 3. exec_state_lock_, backend_states_init_lock_, FilterState::lock_, ExecSummary::lock
 /// 4. Coordinator::BackendState::lock_ (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files