You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/02/21 23:38:49 UTC
[impala] 02/02: IMPALA-9395: fix duplicate broadcast SetFilter()
calls
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit fa2fabfc025b87add7fac64e52593b36a14acd4b
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Feb 19 10:01:03 2020 -0800
IMPALA-9395: fix duplicate broadcast SetFilter() calls
For broadcast join filters, we can get duplicate filters
- one set from the coordinator and one set locally.
The intent is that the first-arriving filter is used
(it's exposed to consumers via SetFilter()) and the
later filters are ignored.
This should be guaranteed by checking HasFilter()
in the same critical section as SetFilter(). I
checked all code paths in runtime-filter-bank.cc
that call SetFilter() for broadcast filters and
fixed the single exception.
Testing:
I wasn't able to reproduce this issue locally. Ran
runtime filter tests to check for regressions.
Change-Id: I95d0620c4dbb5e4066702db48442cebee7389f5a
Reviewed-on: http://gerrit.cloudera.org:8080/15242
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/runtime-filter-bank.cc | 28 +++++++++++++++-------------
be/src/runtime/runtime-filter-bank.h | 4 ++++
2 files changed, 19 insertions(+), 13 deletions(-)
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index e2ce451..5b02ceb 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -230,19 +230,21 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
lock_guard<SpinLock> l(fs->lock);
if (fs->consumed_filter == nullptr) return;
consumed_filter = fs->consumed_filter;
- }
- if (consumed_filter->HasFilter()) {
- // Multiple instances may produce the same filter for broadcast joins.
- // TODO: we would ideally update the coordinator logic to avoid creating duplicates
- // on the same node, but sending out a few duplicate filters is relatively
- // inconsequential for performance.
- DCHECK(consumed_filter->filter_desc().is_broadcast_join)
- << consumed_filter->filter_desc();
- } else {
- consumed_filter->SetFilter(complete_filter);
- query_state_->host_profile()->AddInfoString(
- Substitute("Filter $0 arrival", filter_id),
- PrettyPrinter::Print(consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
+ // Update the filter while still holding the lock to avoid racing with the
+ // SetFilter() call in PublishGlobalFilter().
+ if (consumed_filter->HasFilter()) {
+ // Multiple instances may produce the same filter for broadcast joins.
+ // TODO: we would ideally update the coordinator logic to avoid creating duplicates
+ // on the same node, but sending out a few duplicate filters is relatively
+ // inconsequential for performance.
+ DCHECK(consumed_filter->filter_desc().is_broadcast_join)
+ << consumed_filter->filter_desc();
+ } else {
+ consumed_filter->SetFilter(complete_filter);
+ query_state_->host_profile()->AddInfoString(
+ Substitute("Filter $0 arrival", filter_id),
+ PrettyPrinter::Print(consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
+ }
}
}
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 5c06e09..e8977e4 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -194,6 +194,10 @@ class RuntimeFilterBank {
/// The filter that is returned to consumers that call RegisterFilter(producer=false).
/// Initialised when RegisterFilter(producer=false) is called for this filter id.
+ ///
+ /// For broadcast joins, SetFilter() must be called while holding 'lock' and after
+ /// checking HasFilter() to avoid SetFilter() being called multiple times for
+ /// broadcast join filters.
RuntimeFilter* consumed_filter = nullptr;
/// Contains references to all the bloom filters generated. Used in Close() to safely