You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/01/31 15:40:59 UTC

[impala] 01/02: IMPALA-4224: part 2: refactor filter routing table to support sinks

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b12f0506080971f1e44bb732116fb3197862e378
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Nov 25 18:28:07 2019 -0800

    IMPALA-4224: part 2: refactor filter routing table to support sinks
    
    This is some refactoring that will enable us to correctly
    construct a routing table where join build sinks produce filters:
    * The previous code relied (non-obviously) on producers being
      visited before consumers when walking the plan tree. This is no
      longer true when build sinks produce filters. The code is
      changed so it can construct the routing table entry when the
      first producer or consumer is visited (using GetOrCreateFilterState())
    * The source plan node ID is added to TRuntimeFilterDesc, so that
      it's available when visiting consumer nodes.
    * The logic for adding a filter source to the routing table is factored
      out into a separate function AddFilterSource(), which can be used
      (in a later patch) for join build sinks.
    
    This change is not expected to change behaviour in any way.
    
    Change-Id: Idc53ae0167af45e32093a024ff2d6e2c6466c876
    Reviewed-on: http://gerrit.cloudera.org:8080/15083
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-filter-state.h          |  17 +++-
 be/src/runtime/coordinator.cc                      | 100 +++++++++++----------
 be/src/runtime/coordinator.h                       |  13 ++-
 common/thrift/PlanNodes.thrift                     |   3 +
 .../impala/planner/RuntimeFilterGenerator.java     |   1 +
 5 files changed, 80 insertions(+), 54 deletions(-)

diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 7e92883..cb018b8 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -61,8 +61,8 @@ struct Coordinator::FilterTarget {
 /// calling any FilterState functions if thread safety is needed.
 class Coordinator::FilterState {
  public:
-  FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
-    : desc_(desc), src_(src) {
+  FilterState(const TRuntimeFilterDesc& desc)
+    : desc_(desc) {
     // bloom_filter_ is a disjunction so the unit value is always_false.
     bloom_filter_.set_always_false(true);
     min_max_filter_.set_always_false(true);
@@ -75,7 +75,6 @@ class Coordinator::FilterState {
   const std::vector<FilterTarget>& targets() const { return targets_; }
   int64_t first_arrival_time() const { return first_arrival_time_; }
   int64_t completion_time() const { return completion_time_; }
-  const TPlanNodeId& src() const { return src_; }
   const TRuntimeFilterDesc& desc() const { return desc_; }
   bool is_bloom_filter() const { return desc_.type == TRuntimeFilterType::BLOOM; }
   bool is_min_max_filter() const { return desc_.type == TRuntimeFilterType::MIN_MAX; }
@@ -120,7 +119,6 @@ class Coordinator::FilterState {
   /// Contains the specification of the runtime filter.
   TRuntimeFilterDesc desc_;
 
-  TPlanNodeId src_;
   std::vector<FilterTarget> targets_;
 
   /// Number of remaining backends to hear from before filter is complete.
@@ -164,6 +162,17 @@ class Coordinator::FilterState {
 struct Coordinator::FilterRoutingTable {
   int64_t num_filters() const { return id_to_filter.size(); }
 
+  /// Get the existing FilterState for 'filter' or create it if not present.
+  FilterState* GetOrCreateFilterState(const TRuntimeFilterDesc& filter) {
+    auto i = id_to_filter.find(filter.filter_id);
+    if (i == id_to_filter.end()) {
+      i = id_to_filter.emplace(std::piecewise_construct,
+                           std::forward_as_tuple(filter.filter_id),
+                           std::forward_as_tuple(filter)).first;
+    }
+    return &(i->second);
+  }
+
   /// Maps the filter ID to the state of that filter.
   boost::unordered_map<int32_t, FilterState> id_to_filter;
 
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 6188ecc..94a85e5 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -346,59 +346,18 @@ void Coordinator::InitFilterRoutingTable() {
     int num_backends = fragment_params.GetNumBackends();
     DCHECK_GT(num_backends, 0);
 
+    // TODO: IMPALA-4224: also call AddFilterSource for build sinks that produce filters.
     for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) {
       if (!plan_node.__isset.runtime_filters) continue;
       for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
         DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets);
-        auto i = filter_routing_table_->id_to_filter
-                     .emplace(std::piecewise_construct,
-                         std::forward_as_tuple(filter.filter_id),
-                         std::forward_as_tuple(filter, plan_node.node_id))
-                     .first;
-
-        FilterState* f = &(i->second);
-        // source plan node of filter
+        // Currently hash joins are the only filter sources. Otherwise it must be
+        // a filter consumer.
         if (plan_node.__isset.hash_join_node) {
-          // Set the 'pending_count_' to zero to indicate that for a filter with
-          // local-only targets the coordinator does not expect to receive any filter
-          // updates. We expect to receive a single aggregated filter from each backend
-          // for partitioned joins.
-          int pending_count = filter.is_broadcast_join
-              ? (filter.has_remote_targets ? 1 : 0) : num_backends;
-          f->set_pending_count(pending_count);
-
-          // determine source instances
-          // TODO: IMPALA-9333: having a shared RuntimeFilterBank between all fragments on
-          // a backend allows further optimizations to reduce the number of broadcast join
-          // filters sent over the network, by considering cross-fragment filters on
-          // the same backend as local filters:
-          // 1. Produce a local filter on any backend with a destination fragment.
-          // 2. Only produce one local filter per backend (although, this would be made
-          //    redundant by IMPALA-4224 - sharing broadcast join hash tables).
-          // 3. Don't produce a global filter if all targets can be satisfied with
-          //    local producers.
-          // This work was deferred from the IMPALA-4400 change because it provides only
-          // incremental performance benefits.
-          vector<int> src_idxs = fragment_params.GetInstanceIdxs();
-
-          // If this is a broadcast join with only non-local targets, build and publish it
-          // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join
-          // or it is a broadcast join with local targets, it should be generated
-          // everywhere the join is executed.
-          if (filter.is_broadcast_join && !filter.has_local_targets
-              && num_instances > MAX_BROADCAST_FILTER_PRODUCERS) {
-            random_shuffle(src_idxs.begin(), src_idxs.end());
-            src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
-          }
-          for (int src_idx : src_idxs) {
-            TRuntimeFilterSource filter_src;
-            filter_src.src_node_id = plan_node.node_id;
-            filter_src.filter_id = filter.filter_id;
-            filter_routing_table_->finstance_filters_produced[src_idx].emplace_back(
-                filter_src);
-          }
-          f->set_num_producers(src_idxs.size());
+          AddFilterSource(
+              fragment_params, num_instances, num_backends, filter, plan_node.node_id);
         } else if (plan_node.__isset.hdfs_scan_node || plan_node.__isset.kudu_scan_node) {
+          FilterState* f = filter_routing_table_->GetOrCreateFilterState(filter);
           auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
           DCHECK(it != filter.planid_to_target_ndx.end());
           const TRuntimeFilterTargetDesc& t_target = filter.targets[it->second];
@@ -419,6 +378,51 @@ void Coordinator::InitFilterRoutingTable() {
   filter_routing_table_->is_complete = true;
 }
 
+void Coordinator::AddFilterSource(const FragmentExecParams& src_fragment_params,
+    int num_instances, int num_backends, const TRuntimeFilterDesc& filter,
+    int join_node_id) {
+  FilterState* f = filter_routing_table_->GetOrCreateFilterState(filter);
+  // Set the 'pending_count_' to zero to indicate that for a filter with
+  // local-only targets the coordinator does not expect to receive any filter
+  // updates. We expect to receive a single aggregated filter from each backend
+  // for partitioned joins.
+  int pending_count = filter.is_broadcast_join
+      ? (filter.has_remote_targets ? 1 : 0) : num_backends;
+  f->set_pending_count(pending_count);
+
+  // Determine which instances will produce the filters.
+  // TODO: IMPALA-9333: having a shared RuntimeFilterBank between all fragments on
+  // a backend allows further optimizations to reduce the number of broadcast join
+  // filters sent over the network, by considering cross-fragment filters on
+  // the same backend as local filters:
+  // 1. Produce a local filter on any backend with a destination fragment.
+  // 2. Only produce one local filter per backend (although, this would be made
+  //    redundant by IMPALA-4224 - sharing broadcast join hash tables).
+  // 3. Don't produce a global filter if all targets can be satisfied with
+  //    local producers.
+  // This work was deferred from the IMPALA-4400 change because it provides only
+  // incremental performance benefits.
+  vector<int> src_idxs = src_fragment_params.GetInstanceIdxs();
+
+  // If this is a broadcast join with only non-local targets, build and publish it
+  // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join
+  // or it is a broadcast join with local targets, it should be generated
+  // everywhere the join is executed.
+  if (filter.is_broadcast_join && !filter.has_local_targets
+      && num_instances > MAX_BROADCAST_FILTER_PRODUCERS) {
+    random_shuffle(src_idxs.begin(), src_idxs.end());
+    src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
+  }
+  for (int src_idx : src_idxs) {
+    TRuntimeFilterSource filter_src;
+    filter_src.src_node_id = join_node_id;
+    filter_src.filter_id = filter.filter_id;
+    filter_routing_table_->finstance_filters_produced[src_idx].emplace_back(
+        filter_src);
+  }
+  f->set_num_producers(src_idxs.size());
+}
+
 Status Coordinator::StartBackendExec() {
   int num_backends = backend_states_.size();
   backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
@@ -526,7 +530,7 @@ string Coordinator::FilterDebugString() {
     vector<string> row;
     const FilterState& state = v.second;
     row.push_back(lexical_cast<string>(v.first));
-    row.push_back(lexical_cast<string>(state.src()));
+    row.push_back(lexical_cast<string>(state.desc().src_node_id));
     vector<string> target_ids;
     vector<string> target_types;
     vector<string> partition_filter;
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index aa03f30..c295d66 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -61,6 +61,7 @@ class RuntimeState;
 class TPlanExecRequest;
 class TRuntimeProfileTree;
 class TUpdateCatalogRequest;
+struct FragmentExecParams;
 
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
@@ -509,10 +510,18 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// possible at this point if no fragment instances were assigned to it).
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
-  /// Build the filter routing table by iterating over all plan nodes and collecting the
-  /// filters that they either produce or consume.
+  /// Build the filter routing table by iterating over all plan nodes and data sinks and
+  /// collecting the filters that they either produce or consume.
   void InitFilterRoutingTable();
 
+  /// Helper for InitFilterRoutingTable() that adds a source join node or join build sink
+  /// for 'filter' to the routing table. 'src_fragment_params' is the parameters for the
+  /// fragment containing the join node (if build is integrated) or build sink (if the
+  /// build is separate). 'num_instances' and 'num_backends' are the number of instances
+  /// and backends that the fragment runs on.
+  void AddFilterSource(const FragmentExecParams& src_fragment_params,int num_instances,
+      int num_backends, const TRuntimeFilterDesc& filter, int join_node_id);
+
   /// Helper for HandleExecStateTransition(). Releases all resources associated with
   /// query execution. The ExecState state-machine ensures this is called exactly once.
   void ReleaseExecResources();
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index ab4ae66..4d5a556 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -157,6 +157,9 @@ struct TRuntimeFilterDesc {
   // The size of the filter based on the ndv estimate and the min/max limit specified in
   // the query options. Should be greater than zero for bloom filters, zero otherwise.
   11: optional i64 filter_size_bytes
+
+  // The ID of the plan node that produces this filter.
+  12: optional Types.TPlanNodeId src_node_id
 }
 
 // The information contained in subclasses of ScanNode captured in two separate
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 73fd624..21e3227 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -282,6 +282,7 @@ public final class RuntimeFilterGenerator {
       TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc();
       tFilter.setFilter_id(id_.asInt());
       tFilter.setSrc_expr(srcExpr_.treeToThrift());
+      tFilter.setSrc_node_id(src_.getId().asInt());
       tFilter.setIs_broadcast_join(isBroadcastJoin_);
       tFilter.setNdv_estimate(ndvEstimate_);
       tFilter.setHas_local_targets(hasLocalTargets_);