You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/02/23 16:28:15 UTC

[impala] 02/02: IMPALA-9767: Do not clean up filter while PublishFilter is ongoing

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b9593d572514c3d72dea79aee5d841760d841e05
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Thu Feb 18 17:36:24 2021 -0800

    IMPALA-9767: Do not clean up filter while PublishFilter is ongoing
    
    There have been occurrences of heap-use-after-free in ASAN build during
    runtime filter publishing. This issue happens because the memory
    consumed by an aggregated Bloom filter has been released while the
    coordinator is still sending the aggregated filter via KRPC to workers.
    This patch removes the offending cleanup routine. This patch also
    decouples the cleanup routine from FilterState::DisableAndRelease() into
    a separate method FilterState::Release() and asserts that no RPC is
    inflight while cleaning up the filter.
    
    Testing:
    - Reproduce the bug by instrumenting Coordinator::UpdateFilter().
    - Manually verify that the bug does not happen anymore after the patch.
    - Pass core tests in ASAN build.
    
    Change-Id: I1c408bdedab83c4b9249e2c0c493cb0f894a3d08
    Reviewed-on: http://gerrit.cloudera.org:8080/17095
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc | 10 +++-------
 be/src/runtime/coordinator-filter-state.h   |  3 +++
 be/src/runtime/coordinator.cc               | 25 ++++++++++++-------------
 3 files changed, 18 insertions(+), 20 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index f24f73a..98e0938 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -724,13 +724,9 @@ void Coordinator::BackendState::PublishFilterCompleteCb(
     if (state->num_inflight_rpcs() == 0) {
       // Since we disabled the filter once complete and held FilterState::lock_ while
       // issuing all PublishFilter() rpcs, at this point there can't be any more
-      // PublishFilter() rpcs issued.
-      DCHECK(state->disabled());
-      if (state->is_bloom_filter() && state->bloom_filter_directory().size() > 0) {
-        mem_tracker->Release(state->bloom_filter_directory().size());
-        state->bloom_filter_directory().clear();
-        state->bloom_filter_directory().shrink_to_fit();
-      }
+      // PublishFilter() rpcs issued. As the last callback completed, we must release the
+      // filter state.
+      state->Release(mem_tracker);
       state->get_publish_filter_done_cv().notify_one();
     }
   }
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index bc094ab..eec5da9 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -109,6 +109,9 @@ class Coordinator::FilterState {
   void DisableAndRelease(MemTracker* tracker, const bool all_updates_received);
   /// Disables the filter but does not release the consumed memory.
   void Disable(const bool all_updates_received);
+  /// Release consumed memory of this filter. Caller must hold `lock_` and make sure
+  /// filter already disabled.
+  void Release(MemTracker* tracker);
 
   void IncrementNumInflightRpcs(int i) {
     num_inflight_publish_filter_rpcs_ += i;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 015afbb..3e1a00a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1472,14 +1472,7 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
 
     // Called WaitForExecRpcs() so backend_states_ is valid.
     for (BackendState* bs : backend_states_) {
-      if (!IsExecuting()) {
-        if (rpc_params.has_bloom_filter()) {
-          filter_mem_tracker_->Release(state->bloom_filter_directory().size());
-          state->bloom_filter_directory().clear();
-          state->bloom_filter_directory().shrink_to_fit();
-          return;
-        }
-      }
+      if (!IsExecuting()) break;
 
       if (bs->HasFragmentIdx(target_fragment_idxs)) {
         rpc_params.set_filter_id(params.filter_id());
@@ -1569,11 +1562,7 @@ void Coordinator::FilterState::ApplyUpdate(
 void Coordinator::FilterState::DisableAndRelease(
     MemTracker* tracker, const bool all_updates_received) {
   Disable(all_updates_received);
-  if (is_bloom_filter()) {
-    tracker->Release(bloom_filter_directory_.size());
-    bloom_filter_directory_.clear();
-    bloom_filter_directory_.shrink_to_fit();
-  }
+  Release(tracker);
 }
 
 void Coordinator::FilterState::Disable(const bool all_updates_received) {
@@ -1588,6 +1577,16 @@ void Coordinator::FilterState::Disable(const bool all_updates_received) {
   }
 }
 
+void Coordinator::FilterState::Release(MemTracker* tracker) {
+  DCHECK(disabled());
+  DCHECK(num_inflight_publish_filter_rpcs_ == 0);
+  if (is_bloom_filter()) {
+    tracker->Release(bloom_filter_directory_.size());
+    bloom_filter_directory_.clear();
+    bloom_filter_directory_.shrink_to_fit();
+  }
+}
+
 void Coordinator::FilterState::WaitForPublishFilter() {
   while (num_inflight_publish_filter_rpcs_ > 0) {
     publish_filter_done_cv_.wait(lock_);