You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/10/22 05:30:38 UTC

[impala] 01/02: IMPALA-9019: fix runtime filter propagation with mt_dop

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9549d3859ed502b109abb916be818e0909e9e207
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Oct 18 10:03:18 2019 -0700

    IMPALA-9019: fix runtime filter propagation with mt_dop
    
    The bug was that filter routing table construction removed
    filters from the TPlanNode structure for the join when
    a finstance was not a producer of that filter. The
    TPlanNode is shared between all instances of a fragment
    on a backend, so this meant that the filter was removed
    for all instances on that backend, often meaning that
    no filters would be produced at all.
    
    It was awkward fixing the bug within the framework of
    the current data structures, where the routing table
    is keyed by filter_id, so I ended up refactoring
    the routing table somewhat. This also allowed
    fixing a TODO about O(n^2) construction of the
    routing table.
    
    Testing:
    Add regression test that timed out without fix.
    
    Perf:
    Ran a single node TPC-H workload with scale factor
    30. No perf change.
    
    Change-Id: I26e3628a982d5d9b8b24eb96b28aff11f8aa6669
    Reviewed-on: http://gerrit.cloudera.org:8080/14511
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/partitioned-hash-join-builder.cc       |  9 ++++
 be/src/runtime/coordinator-backend-state.cc        | 32 +++--------
 be/src/runtime/coordinator-filter-state.h          | 62 ++++++++++++++++------
 be/src/runtime/coordinator.cc                      | 59 +++++++++++---------
 be/src/runtime/coordinator.h                       | 27 +++-------
 common/thrift/ImpalaInternalService.thrift         |  9 ++++
 .../queries/QueryTest/joins_mt_dop.test            | 24 +++++++++
 7 files changed, 132 insertions(+), 90 deletions(-)

diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 2f9017d..f896571 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -98,6 +98,15 @@ Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
         filter_desc.is_broadcast_join || state->query_options().num_nodes == 1);
     DCHECK(!state->query_options().disable_row_runtime_filtering ||
         filter_desc.applied_on_partition_columns);
+    // Skip over filters that are not produced by this instance of the join, i.e.
+    // broadcast filters where this instance was not selected as a filter producer.
+    const vector<TRuntimeFilterSource> filters_produced =
+        state->instance_ctx().filters_produced;
+    auto it = std::find_if(filters_produced.begin(), filters_produced.end(),
+        [this, &filter_desc](const TRuntimeFilterSource f) {
+          return f.src_node_id == join_node_id_ && f.filter_id == filter_desc.filter_id;
+        });
+    if (it == filters_produced.end()) continue;
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
         ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, &filter_expr));
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index ea1691e..5ab055b 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -145,33 +145,13 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
 
     if (filter_mode_ == TRuntimeFilterMode::OFF) continue;
 
-    // Remove filters that weren't selected during filter routing table construction.
-    // TODO: do this more efficiently, we're looping over the entire plan for each
-    // instance separately
     int instance_idx = GetInstanceIdx(params.instance_id);
-    for (TPlanNode& plan_node : sidecar->fragment_ctxs.back().fragment.plan.nodes) {
-      if (!plan_node.__isset.hash_join_node) continue;
-      if (!plan_node.__isset.runtime_filters) continue;
-
-      vector<TRuntimeFilterDesc> required_filters;
-      for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) {
-        FilterRoutingTable::const_iterator filter_it =
-            filter_routing_table.find(desc.filter_id);
-        // filter was dropped in Coordinator::InitFilterRoutingTable()
-        if (filter_it == filter_routing_table.end()) continue;
-        const FilterState& f = filter_it->second;
-        if (f.src_fragment_instance_idxs().find(instance_idx)
-            == f.src_fragment_instance_idxs().end()) {
-          DCHECK(desc.is_broadcast_join);
-          continue;
-        }
-        // We don't need a target-side check here, because a filter is either sent to
-        // all its targets or none, and the none case is handled by checking if the
-        // filter is in the routing table.
-        required_filters.push_back(desc);
-      }
-      plan_node.__set_runtime_filters(required_filters);
-    }
+    auto& produced_map = filter_routing_table.finstance_filters_produced;
+    auto produced_it = produced_map.find(instance_idx);
+    if (produced_it == produced_map.end()) continue;
+    // Finstance needs list of source filters that were selected during filter routing
+    // table construction.
+    instance_ctx.__set_filters_produced(produced_it->second);
   }
 }
 
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 2cb0602..e16abea 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -17,6 +17,7 @@
 
 
 #include <memory>
+#include <utility>
 #include <vector>
 #include <boost/unordered_set.hpp>
 
@@ -58,8 +59,7 @@ struct Coordinator::FilterTarget {
 class Coordinator::FilterState {
  public:
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
-    : desc_(desc), src_(src), pending_count_(0), first_arrival_time_(0L),
-      completion_time_(0L) {
+    : desc_(desc), src_(src) {
     // bloom_filter_ is a disjunction so the unit value is always_false.
     bloom_filter_.always_false = true;
     min_max_filter_.always_false = true;
@@ -67,12 +67,6 @@ class Coordinator::FilterState {
 
   TBloomFilter& bloom_filter() { return bloom_filter_; }
   TMinMaxFilter& min_max_filter() { return min_max_filter_; }
-  boost::unordered_set<int>* src_fragment_instance_idxs() {
-    return &src_fragment_instance_idxs_;
-  }
-  const boost::unordered_set<int>& src_fragment_instance_idxs() const {
-    return src_fragment_instance_idxs_;
-  }
   std::vector<FilterTarget>* targets() { return &targets_; }
   const std::vector<FilterTarget>& targets() const { return targets_; }
   int64_t first_arrival_time() const { return first_arrival_time_; }
@@ -83,6 +77,8 @@ class Coordinator::FilterState {
   bool is_min_max_filter() const { return desc_.type == TRuntimeFilterType::MIN_MAX; }
   int pending_count() const { return pending_count_; }
   void set_pending_count(int pending_count) { pending_count_ = pending_count; }
+  int num_producers() const { return num_producers_; }
+  void set_num_producers(int num_producers) { num_producers_ = num_producers; }
   bool disabled() const {
     if (is_bloom_filter()) {
       return bloom_filter_.always_true;
@@ -106,11 +102,12 @@ class Coordinator::FilterState {
   TPlanNodeId src_;
   std::vector<FilterTarget> targets_;
 
-  // Indices of source fragment instances (as returned by GetInstanceIdx()).
-  boost::unordered_set<int> src_fragment_instance_idxs_;
-
   /// Number of remaining backends to hear from before filter is complete.
-  int pending_count_;
+  int pending_count_ = 0;
+
+  /// Number of fragment instances producing this filter. The full information about the
+  /// producer instances is tracked in 'finstance_filters_produced'.
+  int num_producers_ = 0;
 
   /// Filters aggregated from all source plan nodes, to be broadcast to all
   /// destination plan fragment instances. Only set for partitioned joins (broadcast joins
@@ -122,14 +119,45 @@ class Coordinator::FilterState {
   TMinMaxFilter min_max_filter_;
 
   /// Time at which first local filter arrived.
-  int64_t first_arrival_time_;
+  int64_t first_arrival_time_ = 0L;
 
   /// Time at which all local filters arrived.
-  int64_t completion_time_;
-
-  /// TODO: Add a per-object lock so that we can avoid holding the global filter_lock_
-  /// for every filter update.
+  int64_t completion_time_ = 0L;
 
+  /// TODO: Add a per-object lock so that we can avoid holding the global routing table
+  /// lock for every filter update.
 };
 
+/// Struct to contain all of the data structures for filter routing. Coordinator
+/// has access to all the internals of this structure and must protect invariants.
+struct Coordinator::FilterRoutingTable {
+  int64_t num_filters() const { return id_to_filter.size(); }
+
+  /// Maps the filter ID to the state of that filter.
+  boost::unordered_map<int32_t, FilterState> id_to_filter;
+
+  // List of runtime filters that this fragment instance is the source for.
+  // The key of the map is the instance index returned by GetInstanceIdx().
+  // The value is source plan node id and the filter ID.
+  boost::unordered_map<int, std::vector<TRuntimeFilterSource>> finstance_filters_produced;
+
+  /// Synchronizes updates to the state of this routing table.
+  SpinLock update_lock;
+
+  /// Protects this routing table.
+  /// Usage pattern:
+  /// 1. To update the routing table: Acquire shared access on 'lock' and
+  ///    upgrade to exclusive access by subsequently acquiring 'update_lock'.
+  /// 2. To read the routing table: if 'is_complete' is true and no threads
+  ///    will be destroying the table concurrently, it is safe to read the
+  ///    routing table without acquiring a lock. Otherwise, acquire shared
+  ///    access on 'lock'
+  /// 3. To initialize/destroy the routing table: Directly acquire exclusive
+  ///    access on 'lock'.
+  boost::shared_mutex lock;
+
+  /// Set to true when all calls to UpdateFilterRoutingTable() have finished, and it's
+  /// safe to concurrently read from this routing table.
+  bool is_complete = false;
+};
 }
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 2b5a13c..d2a5ddd 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -78,7 +78,8 @@ Coordinator::Coordinator(ClientRequestState* parent, const QuerySchedule& schedu
     obj_pool_(new ObjectPool()),
     query_events_(events),
     exec_rpcs_complete_barrier_(schedule_.per_backend_exec_params().size()),
-    backend_released_barrier_(schedule_.per_backend_exec_params().size()) {}
+    backend_released_barrier_(schedule_.per_backend_exec_params().size()),
+    filter_routing_table_(new FilterRoutingTable) {}
 
 Coordinator::~Coordinator() {
   // Must have entered a terminal exec state guaranteeing resources were released.
@@ -123,10 +124,6 @@ Status Coordinator::Exec() {
   InitBackendStates();
   exec_summary_.Init(schedule_);
 
-  // TODO-MT: populate the runtime filter routing table
-  // This requires local aggregation of filters prior to sending
-  // for broadcast joins in order to avoid more complicated merge logic here.
-
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
     DCHECK_EQ(request.plan_exec_info.size(), 1);
     // Populate the runtime filter routing table. This should happen before starting the
@@ -290,10 +287,10 @@ void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
 void Coordinator::InitFilterRoutingTable() {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "InitFilterRoutingTable() called although runtime filters are disabled";
-  DCHECK(!filter_routing_table_complete_)
-      << "InitFilterRoutingTable() called after setting filter_routing_table_complete_";
+  DCHECK(!filter_routing_table_->is_complete)
+      << "InitFilterRoutingTable() called after table marked as complete";
 
-  lock_guard<shared_mutex> lock(filter_lock_);
+  lock_guard<shared_mutex> lock(filter_routing_table_->lock); // Exclusive lock.
   for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
     int num_instances = fragment_params.instance_exec_params.size();
     DCHECK_GT(num_instances, 0);
@@ -302,7 +299,7 @@ void Coordinator::InitFilterRoutingTable() {
       if (!plan_node.__isset.runtime_filters) continue;
       for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
         DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets);
-        FilterRoutingTable::iterator i = filter_routing_table_.emplace(
+        auto i = filter_routing_table_->id_to_filter.emplace(
             filter.filter_id, FilterState(filter, plan_node.node_id)).first;
         FilterState* f = &(i->second);
 
@@ -328,9 +325,14 @@ void Coordinator::InitFilterRoutingTable() {
             random_shuffle(src_idxs.begin(), src_idxs.end());
             src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
           }
-          f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end());
-
-        // target plan node of filter
+          for (int src_idx : src_idxs) {
+            TRuntimeFilterSource filter_src;
+            filter_src.src_node_id = plan_node.node_id;
+            filter_src.filter_id = filter.filter_id;
+            filter_routing_table_->finstance_filters_produced[src_idx].emplace_back(
+                filter_src);
+          }
+          f->set_num_producers(src_idxs.size());
         } else if (plan_node.__isset.hdfs_scan_node || plan_node.__isset.kudu_scan_node) {
           auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
           DCHECK(it != filter.planid_to_target_ndx.end());
@@ -346,10 +348,10 @@ void Coordinator::InitFilterRoutingTable() {
   }
 
   query_profile_->AddInfoString(
-      "Number of filters", Substitute("$0", filter_routing_table_.size()));
+      "Number of filters", Substitute("$0", filter_routing_table_->num_filters()));
   query_profile_->AddInfoString("Filter routing table", FilterDebugString());
   if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
-  filter_routing_table_complete_ = true;
+  filter_routing_table_->is_complete = true;
 }
 
 void Coordinator::StartBackendExec() {
@@ -366,8 +368,14 @@ void Coordinator::StartBackendExec() {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
         [backend_state, this, &debug_options]() {
           DebugActionNoFail(schedule_.query_options(), "COORD_BEFORE_EXEC_RPC");
+          // Safe for Exec() to read 'filter_routing_table_' because it is complete
+          // at this point and won't be destroyed while this function is executing,
+          // because it won't be torn down until 'exec_rpcs_complete_barrier_' is
+          // signalled.
+          DCHECK(filter_mode_ == TRuntimeFilterMode::OFF
+              || filter_routing_table_->is_complete);
           backend_state->Exec(
-              debug_options, filter_routing_table_, &exec_rpcs_complete_barrier_);
+              debug_options, *filter_routing_table_, &exec_rpcs_complete_barrier_);
         });
   }
   exec_rpcs_complete_barrier_.Wait();
@@ -436,7 +444,7 @@ string Coordinator::FilterDebugString() {
     table_printer.AddColumn("Completed", false);
   }
   table_printer.AddColumn("Enabled", false);
-  for (FilterRoutingTable::value_type& v: filter_routing_table_) {
+  for (auto& v: filter_routing_table_->id_to_filter) {
     vector<string> row;
     const FilterState& state = v.second;
     row.push_back(lexical_cast<string>(v.first));
@@ -455,8 +463,7 @@ string Coordinator::FilterDebugString() {
 
     if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
       int pending_count = state.completion_time() != 0L ? 0 : state.pending_count();
-      row.push_back(Substitute("$0 ($1)", pending_count,
-          state.src_fragment_instance_idxs().size()));
+      row.push_back(Substitute("$0 ($1)", pending_count, state.num_producers()));
       if (state.first_arrival_time() == 0L) {
         row.push_back("N/A");
       } else {
@@ -922,12 +929,12 @@ string Coordinator::GetErrorLog() {
 }
 
 void Coordinator::ReleaseExecResources() {
-  lock_guard<shared_mutex> lock(filter_lock_);
-  if (filter_routing_table_.size() > 0) {
+  lock_guard<shared_mutex> lock(filter_routing_table_->lock); // Exclusive lock.
+  if (filter_routing_table_->num_filters() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
 
-  for (auto& filter : filter_routing_table_) {
+  for (auto& filter : filter_routing_table_->id_to_filter) {
     FilterState* state = &filter.second;
     state->Disable(filter_mem_tracker_);
   }
@@ -995,27 +1002,27 @@ vector<TNetworkAddress> Coordinator::GetActiveBackends(
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
-  shared_lock<shared_mutex> lock(filter_lock_);
+  shared_lock<shared_mutex> lock(filter_routing_table_->lock);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
   DCHECK(backend_exec_complete_barrier_.get() != nullptr)
       << "Filters received before fragments started!";
 
   exec_rpcs_complete_barrier_.Wait();
-  DCHECK(filter_routing_table_complete_)
+  DCHECK(filter_routing_table_->is_complete)
       << "Filter received before routing table complete";
 
   TPublishFilterParams rpc_params;
   unordered_set<int> target_fragment_idxs;
   {
-    lock_guard<SpinLock> l(filter_update_lock_);
+    lock_guard<SpinLock> l(filter_routing_table_->update_lock);
     if (!IsExecuting()) {
       LOG(INFO) << "Filter update received for non-executing query with id: "
                 << query_id();
       return;
     }
-    FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id);
-    if (it == filter_routing_table_.end()) {
+    auto it = filter_routing_table_->id_to_filter.find(params.filter_id);
+    if (it == filter_routing_table_->id_to_filter.end()) {
       LOG(INFO) << "Could not find filter with id: " << params.filter_id;
       return;
     }
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 5650403..c8db6d7 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -18,9 +18,9 @@
 #ifndef IMPALA_RUNTIME_COORDINATOR_H
 #define IMPALA_RUNTIME_COORDINATOR_H
 
+#include <memory>
 #include <string>
 #include <vector>
-#include <boost/scoped_ptr.hpp>
 #include <boost/thread/shared_mutex.hpp>
 #include <boost/unordered_map.hpp>
 #include <rapidjson/document.h>
@@ -230,6 +230,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   class BackendState;
   class BackendResourceState;
   struct FilterTarget;
+  struct FilterRoutingTable;
   class FilterState;
   class FragmentStats;
 
@@ -312,7 +313,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   MemTracker* filter_mem_tracker_ = nullptr;
 
   /// Object pool owned by the coordinator.
-  boost::scoped_ptr<ObjectPool> obj_pool_;
+  std::unique_ptr<ObjectPool> obj_pool_;
 
   /// Execution summary for a single query.
   /// A wrapper around TExecSummary, with supporting structures.
@@ -354,7 +355,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Barrier that is released when all backends have indicated execution completion,
   /// or when all backends are cancelled due to an execution error or client requested
   /// cancellation. Initialized in StartBackendExec().
-  boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
+  std::unique_ptr<CountingBarrier> backend_exec_complete_barrier_;
 
   /// Barrier that is released when all Backends have released their admission control
   /// resources.
@@ -381,24 +382,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// - ERROR: error status
   Status exec_status_;
 
-  /// Synchronizes updates to the filter_routing_table_.
-  SpinLock filter_update_lock_;
-
-  /// Protects filter_routing_table_.
-  /// Usage pattern:
-  /// 1. To update filter_routing_table_: Acquire shared access on filter_lock_ and
-  ///    upgrade to exclusive access by subsequently acquiring filter_update_lock_.
-  /// 2. To read, initialize/destroy filter_routing_table: Directly acquire exclusive
-  ///    access on filter_lock_.
-  boost::shared_mutex filter_lock_;
-
-  /// Map from filter ID to filter.
-  typedef boost::unordered_map<int32_t, FilterState> FilterRoutingTable;
-  FilterRoutingTable filter_routing_table_;
-
-  /// Set to true when all calls to UpdateFilterRoutingTable() have finished, and it's
-  /// safe to concurrently read from filter_routing_table_.
-  bool filter_routing_table_complete_ = false;
+  /// Contains all the state about filters being handled by this coordinator.
+  std::unique_ptr<FilterRoutingTable> filter_routing_table_;
 
   /// True if the first row has been fetched, false otherwise.
   bool first_row_fetched_ = false;
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 48e89fc..62b396e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -603,6 +603,12 @@ struct TScanRangeParams {
   4: optional bool is_remote
 }
 
+// Descriptor that indicates that a runtime filter is produced by a plan node.
+struct TRuntimeFilterSource {
+  1: required Types.TPlanNodeId src_node_id
+  2: required i32 filter_id
+}
+
 // Execution parameters of a single fragment instance.
 struct TPlanFragmentInstanceCtx {
   // TPlanFragment.idx
@@ -634,6 +640,9 @@ struct TPlanFragmentInstanceCtx {
   6: optional i32 sender_id
 
   7: optional TDebugOptions debug_options
+
+  // List of runtime filters produced by nodes in the finstance.
+  8: optional list<TRuntimeFilterSource> filters_produced
 }
 
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
index ec8243a..02c5613 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/joins_mt_dop.test
@@ -25,3 +25,27 @@ where a.id < 10 and c.bigint_col = 10
 ---- RESULTS
 291990
 ====
+---- QUERY
+# Regression IMPALA-9019: fix runtime filter propagation with mt_dop.
+# This plan has a global BROADCAST filter sent from the scan of alltypestiny
+# to the scan of alltypes. Before the bug was fixed, this filter would not
+# be produced, but was expected by the destination. Setting the wait time
+# higher than the time limit caused this query to reliably fail because
+# the destination scan would block waiting for the filter to arrive.
+set exec_time_limit_s=30;
+set runtime_filter_wait_time_ms=120000;
+select straight_join t1.id, t1.int_col
+from (select distinct * from alltypes) t1
+  join /*+broadcast*/ alltypestiny t2 on t1.id = t2.id;
+---- TYPES
+int,int
+---- RESULTS
+0,0
+1,1
+2,2
+3,3
+4,4
+5,5
+6,6
+7,7
+====