You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/02/21 23:38:49 UTC

[impala] 02/02: IMPALA-9395: fix duplicate broadcast SetFilter() calls

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fa2fabfc025b87add7fac64e52593b36a14acd4b
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Feb 19 10:01:03 2020 -0800

    IMPALA-9395: fix duplicate broadcast SetFilter() calls
    
    For broadcast join filters, we can get duplicate filters
    - one set from the coordinator and one set locally.
    The intent is that the first-arriving filter is used
    (it's exposed to consumers via SetFilter()) and the
    later filters are ignored.
    
    This should be guaranteed by checking HasFilter()
    in the same critical section as SetFilter(). I
    checked all code paths in runtime-filter-bank.cc
    that call SetFilter() for broadcast filters and
    fixed the single exception.
    
    Testing:
    I wasn't able to reproduce this issue locally. Ran
    runtime filter tests to check for regressions.
    
    Change-Id: I95d0620c4dbb5e4066702db48442cebee7389f5a
    Reviewed-on: http://gerrit.cloudera.org:8080/15242
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/runtime-filter-bank.cc | 28 +++++++++++++++-------------
 be/src/runtime/runtime-filter-bank.h  |  4 ++++
 2 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index e2ce451..5b02ceb 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -230,19 +230,21 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
       lock_guard<SpinLock> l(fs->lock);
       if (fs->consumed_filter == nullptr) return;
       consumed_filter = fs->consumed_filter;
-    }
-    if (consumed_filter->HasFilter()) {
-      // Multiple instances may produce the same filter for broadcast joins.
-      // TODO: we would ideally update the coordinator logic to avoid creating duplicates
-      // on the same node, but sending out a few duplicate filters is relatively
-      // inconsequential for performance.
-      DCHECK(consumed_filter->filter_desc().is_broadcast_join)
-          << consumed_filter->filter_desc();
-    } else {
-      consumed_filter->SetFilter(complete_filter);
-      query_state_->host_profile()->AddInfoString(
-          Substitute("Filter $0 arrival", filter_id),
-          PrettyPrinter::Print(consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
+      // Update the filter while still holding the lock to avoid racing with the
+      // SetFilter() call in PublishGlobalFilter().
+      if (consumed_filter->HasFilter()) {
+        // Multiple instances may produce the same filter for broadcast joins.
+        // TODO: we would ideally update the coordinator logic to avoid creating duplicates
+        // on the same node, but sending out a few duplicate filters is relatively
+        // inconsequential for performance.
+        DCHECK(consumed_filter->filter_desc().is_broadcast_join)
+            << consumed_filter->filter_desc();
+      } else {
+        consumed_filter->SetFilter(complete_filter);
+        query_state_->host_profile()->AddInfoString(
+            Substitute("Filter $0 arrival", filter_id),
+            PrettyPrinter::Print(consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
+      }
     }
   }
 
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 5c06e09..e8977e4 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -194,6 +194,10 @@ class RuntimeFilterBank {
 
     /// The filter that is returned to consumers that call RegisterFilter(producer=false).
     /// Initialised when RegisterFilter(producer=false) is called for this filter id.
+    ///
+    /// For broadcast joins, SetFilter() must be called while holding 'lock' and after
+    /// checking HasFilter() to avoid SetFilter() being called multiple times for
+    /// broadcast join filters.
     RuntimeFilter* consumed_filter = nullptr;
 
     /// Contains references to all the bloom filters generated. Used in Close() to safely