You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/01/21 00:50:13 UTC

[impala] 01/03: IMPALA-4224: part 1: schedule join builds

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 195bd7635f326061e999fe2a5e4a20ce6a4a9874
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sun Dec 22 01:13:26 2019 -0800

    IMPALA-4224: part 1: schedule join builds
    
    This adds the scheduler logic for fragments with join builds at their
    root. These fragments need to be co-located with the fragment with the
    join node.
    
    The new code is not active yet because the planner does not generate
    plans with join builds (except for planner tests). This change
    was validated in the context of a larger patch that enables the join
    build plans via the planner and makes query execution work.
    
    Change-Id: I779463cfa2ea9b372607d2be6d5d2252a6469e34
    Reviewed-on: http://gerrit.cloudera.org:8080/14944
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/query-schedule.cc                |  7 +-
 be/src/scheduling/query-schedule.h                 |  9 ++-
 be/src/scheduling/scheduler.cc                     | 77 ++++++++++++++++------
 be/src/scheduling/scheduler.h                      |  9 ++-
 common/thrift/DataSinks.thrift                     |  3 +-
 common/thrift/Frontend.thrift                      |  3 +-
 common/thrift/ImpalaInternalService.thrift         | 12 ++++
 .../org/apache/impala/planner/JoinBuildSink.java   |  6 +-
 8 files changed, 98 insertions(+), 28 deletions(-)

diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 857a182..25fe3b2 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -124,7 +124,7 @@ void QuerySchedule::Init() {
       PlanNodeId dest_node_id = fragment.output_sink.stream_sink.dest_node_id;
       FragmentIdx dest_idx = plan_node_to_fragment_idx_[dest_node_id];
       FragmentExecParams& dest_params = fragment_exec_params_[dest_idx];
-      dest_params.input_fragments.push_back(fragment.idx);
+      dest_params.exchange_input_fragments.push_back(fragment.idx);
     }
   }
 }
@@ -181,6 +181,11 @@ void QuerySchedule::Validate() const {
     }
   }
 
+  // Check that all fragments have instances.
+  for (const FragmentExecParams& fp: fragment_exec_params_) {
+    DCHECK_GT(fp.instance_exec_params.size(), 0) << fp.fragment;
+  }
+
   for (const auto& elem: per_backend_exec_params_) {
     const BackendExecParams& bp = elem.second;
     DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index d019eb2..a57ebb7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -106,6 +106,9 @@ struct FInstanceExecParams {
   /// uniquely identify it to a receiver. -1 = invalid.
   int sender_id;
 
+  // List of input join build finstances for joins in this finstance.
+  std::vector<TJoinBuildInput> join_build_inputs;
+
   /// The parent FragmentExecParams
   const FragmentExecParams& fragment_exec_params;
   const TPlanFragment& fragment() const;
@@ -136,7 +139,9 @@ struct FragmentExecParams {
 
   bool is_coord_fragment;
   const TPlanFragment& fragment;
-  std::vector<FragmentIdx> input_fragments;
+
+  // Fragments that are inputs to an ExchangeNode of this fragment.
+  std::vector<FragmentIdx> exchange_input_fragments;
   std::vector<FInstanceExecParams> instance_exec_params;
 
   FragmentExecParams(const TPlanFragment& fragment)
@@ -372,7 +377,7 @@ class QuerySchedule {
   string executor_group_;
 
   /// Populate fragment_exec_params_ from request_.plan_exec_info.
-  /// Sets is_coord_fragment and input_fragments.
+  /// Sets is_coord_fragment and exchange_input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 626afde..b458fa6 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -202,7 +202,10 @@ void Scheduler::ComputeFragmentExecParams(
     const ExecutorConfig& executor_config, QuerySchedule* schedule) {
   const TQueryExecRequest& exec_request = schedule->request();
 
-  // for each plan, compute the FInstanceExecParams for the tree of fragments
+  // for each plan, compute the FInstanceExecParams for the tree of fragments.
+  // The plans are in dependency order, so we compute parameters for each plan
+  // *before* its input join build plans. This allows the join build plans to
+  // be easily co-located with the plans consuming their output.
   for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
     // set instance_id, host, per_node_scan_ranges
     ComputeFragmentExecParams(executor_config, plan_exec_info,
@@ -210,13 +213,11 @@ void Scheduler::ComputeFragmentExecParams(
 
     // Set destinations, per_exch_num_senders, sender_id.
     for (const TPlanFragment& src_fragment : plan_exec_info.fragments) {
+      VLOG(3) << "Computing exec params for fragment " << src_fragment.display_name;
       if (!src_fragment.output_sink.__isset.stream_sink) continue;
       FragmentIdx dest_idx =
           schedule->GetFragmentIdx(src_fragment.output_sink.stream_sink.dest_node_id);
-      DCHECK_LT(dest_idx, plan_exec_info.fragments.size());
-      const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx];
-      FragmentExecParams* dest_params =
-          schedule->GetFragmentExecParams(dest_fragment.idx);
+      FragmentExecParams* dest_params = schedule->GetFragmentExecParams(dest_idx);
       FragmentExecParams* src_params = schedule->GetFragmentExecParams(src_fragment.idx);
 
       // populate src_params->destinations
@@ -254,15 +255,24 @@ void Scheduler::ComputeFragmentExecParams(
 void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
     const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
-  // traverse input fragments
-  for (FragmentIdx input_fragment_idx : fragment_params->input_fragments) {
+  // Create exec params for child fragments connected by an exchange. Instance creation
+  // for this fragment depends on where the input fragment instances are scheduled.
+  for (FragmentIdx input_fragment_idx : fragment_params->exchange_input_fragments) {
     ComputeFragmentExecParams(executor_config, plan_exec_info,
         schedule->GetFragmentExecParams(input_fragment_idx), schedule);
   }
 
   const TPlanFragment& fragment = fragment_params->fragment;
-  // case 1: single instance executed at coordinator
-  if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+  if (fragment.output_sink.__isset.join_build_sink) {
+    // case 0: join build fragment, co-located with its parent fragment. Join build
+    // fragments may be unpartitioned if they are co-located with the root fragment.
+    VLOG(3) << "Computing exec params for collocated join build fragment "
+            << fragment_params->fragment.display_name;
+    CreateCollocatedJoinBuildInstances(fragment_params, schedule);
+  } else if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+    // case 1: root fragment instance executed at coordinator
+    VLOG(3) << "Computing exec params for coordinator fragment "
+            << fragment_params->fragment.display_name;
     const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
     const TNetworkAddress& coord = local_be_desc.address;
     DCHECK(local_be_desc.__isset.krpc_address);
@@ -282,20 +292,18 @@ void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
       auto first_entry = fragment_params->scan_range_assignment.begin();
       instance_params.per_node_scan_ranges = first_entry->second;
     }
-
-    return;
-  }
-
-  if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)
+  } else if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)
       || ContainsScanNode(fragment.plan)) {
+    VLOG(3) << "Computing exec params for scan and/or union fragment.";
     // case 2: leaf fragment (i.e. no input fragments) with a single scan node.
     // case 3: union fragment, which may have scan nodes and may have input fragments.
     CreateCollocatedAndScanInstances(executor_config, fragment_params, schedule);
   } else {
+    VLOG(3) << "Computing exec params for interior fragment.";
     // case 4: interior (non-leaf) fragment without a scan or union.
     // We assign the same hosts as those of our leftmost input fragment (so that a
     // merge aggregation fragment runs on the hosts that provide the input data).
-    CreateCollocatedInstances(fragment_params, schedule);
+    CreateInputCollocatedInstances(fragment_params, schedule);
   }
 }
 
@@ -367,7 +375,7 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
   // the input scan, for consistency with the previous behaviour of only using
   // the parallelism of the scan.
   if (has_union) {
-    for (FragmentIdx idx : fragment_params->input_fragments) {
+    for (FragmentIdx idx : fragment_params->exchange_input_fragments) {
       std::unordered_map<TNetworkAddress, int> input_fragment_instances_per_host;
       const FragmentExecParams& input_params = *schedule->GetFragmentExecParams(idx);
       for (const FInstanceExecParams& instance_params :
@@ -503,20 +511,47 @@ vector<vector<TScanRangeParams>> Scheduler::AssignRangesToInstances(
   return per_instance_ranges;
 }
 
-void Scheduler::CreateCollocatedInstances(
+void Scheduler::CreateInputCollocatedInstances(
     FragmentExecParams* fragment_params, QuerySchedule* schedule) {
-  DCHECK_GE(fragment_params->input_fragments.size(), 1);
-  const FragmentExecParams* input_fragment_params =
-      schedule->GetFragmentExecParams(fragment_params->input_fragments[0]);
+  DCHECK_GE(fragment_params->exchange_input_fragments.size(), 1);
+  const FragmentExecParams& input_fragment_params =
+      *schedule->GetFragmentExecParams(fragment_params->exchange_input_fragments[0]);
   int per_fragment_instance_idx = 0;
   for (const FInstanceExecParams& input_instance_params :
-      input_fragment_params->instance_exec_params) {
+      input_fragment_params.instance_exec_params) {
     fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
         input_instance_params.host, input_instance_params.krpc_host,
         per_fragment_instance_idx++, *fragment_params);
   }
 }
 
+void Scheduler::CreateCollocatedJoinBuildInstances(
+    FragmentExecParams* fragment_params, QuerySchedule* schedule) {
+  const TPlanFragment& fragment = fragment_params->fragment;
+  DCHECK(fragment.output_sink.__isset.join_build_sink);
+  const TJoinBuildSink& sink = fragment.output_sink.join_build_sink;
+  int join_fragment_idx = schedule->GetFragmentIdx(sink.dest_node_id);
+  FragmentExecParams* join_fragment_params =
+      schedule->GetFragmentExecParams(join_fragment_idx);
+  DCHECK(!join_fragment_params->instance_exec_params.empty())
+      << "Parent fragment instances must already be created.";
+  int per_fragment_instance_idx = 0;
+  for (FInstanceExecParams& parent_exec_params :
+      join_fragment_params->instance_exec_params) {
+    TUniqueId instance_id = schedule->GetNextInstanceId();
+    fragment_params->instance_exec_params.emplace_back(instance_id,
+        parent_exec_params.host, parent_exec_params.krpc_host,
+        per_fragment_instance_idx++, *fragment_params);
+    TJoinBuildInput build_input;
+    build_input.__set_join_node_id(sink.dest_node_id);
+    build_input.__set_input_finstance_id(instance_id);
+    parent_exec_params.join_build_inputs.emplace_back(build_input);
+    VLOG(3) << "Linked join build for node id=" << sink.dest_node_id
+            << " build finstance=" << PrintId(instance_id)
+            << " dst finstance=" << PrintId(parent_exec_params.instance_id);
+  }
+}
+
 Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
     PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
     bool node_random_replica, const vector<TScanRangeLocationList>& locations,
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index a2da46f..f5f5c2e 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -395,7 +395,14 @@ class Scheduler {
   /// For each instance of fragment_params's input fragment, create a collocated
   /// instance for fragment_params's fragment.
   /// Expects that fragment_params only has a single input fragment.
-  void CreateCollocatedInstances(
+  void CreateInputCollocatedInstances(
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// Create instances for a fragment that has a join build sink as its root.
+  /// These instances will be collocated with the fragment instances that consume
+  /// the join build. Therefore, those instances must have already been created
+  /// by the scheduler.
+  void CreateCollocatedJoinBuildInstances(
       FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// Add all hosts that the scans identified by 'scan_ids' are executed on to
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 19dd86b..fdf8f6c 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -100,7 +100,8 @@ struct TKuduTableSink {
 
 // Sink to create the build side of a JoinNode.
 struct TJoinBuildSink {
-  1: required Types.TJoinTableId join_table_id
+  // destination join node id
+  1: required Types.TPlanNodeId dest_node_id
 
   // only set for hash join build sinks
   2: required list<Exprs.TExpr> build_exprs
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index e6e1fd8..716818b 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -406,7 +406,8 @@ struct TPlanExecInfo {
 
 // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
 struct TQueryExecRequest {
-  // exec info for all plans; the first one materializes the query result
+  // exec info for all plans; the first one materializes the query result and subsequent
+  // ones materialize join builds that are input for preceding plans in the list.
   1: optional list<TPlanExecInfo> plan_exec_info
 
   // Metadata of the query result set (only for select)
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 2285d4b..55eb0cf 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -613,6 +613,15 @@ struct TRuntimeFilterSource {
   2: required i32 filter_id
 }
 
+// Information about the input fragment instance of a join node.
+struct TJoinBuildInput {
+  // The join node id that will consume this join build.
+  1: required Types.TPlanNodeId join_node_id
+
+  // Fragment instance id of the input fragment instance.
+  2: required Types.TUniqueId input_finstance_id
+}
+
 // Execution parameters of a single fragment instance.
 struct TPlanFragmentInstanceCtx {
   // TPlanFragment.idx
@@ -647,6 +656,9 @@ struct TPlanFragmentInstanceCtx {
 
   // List of runtime filters produced by nodes in the finstance.
   8: optional list<TRuntimeFilterSource> filters_produced
+
+  // List of input join build finstances for joins in this finstance.
+  9: optional list<TJoinBuildInput> join_build_inputs
 }
 
 
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 5610646..63e733f 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -38,6 +38,9 @@ public class JoinBuildSink extends DataSink {
   // id of join's build-side table assigned during planning
   private final JoinTableId joinTableId_;
 
+  // Reference to the join node that consumes the build side.
+  private final JoinNode joinNode_;
+
   private final List<Expr> buildExprs_ = new ArrayList<>();
 
   /**
@@ -46,6 +49,7 @@ public class JoinBuildSink extends DataSink {
   public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) {
     Preconditions.checkState(joinTableId.isValid());
     joinTableId_ = joinTableId;
+    joinNode_ = joinNode;
     Preconditions.checkNotNull(joinNode);
     Preconditions.checkState(joinNode instanceof JoinNode);
     if (!(joinNode instanceof HashJoinNode)) return;
@@ -61,7 +65,7 @@ public class JoinBuildSink extends DataSink {
   @Override
   protected void toThriftImpl(TDataSink tsink) {
     TJoinBuildSink tBuildSink = new TJoinBuildSink();
-    tBuildSink.setJoin_table_id(joinTableId_.asInt());
+    tBuildSink.setDest_node_id(joinNode_.getId().asInt());
     for (Expr buildExpr: buildExprs_) {
       tBuildSink.addToBuild_exprs(buildExpr.treeToThrift());
     }