You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2021/10/20 04:44:00 UTC

[impala] 04/04: IMPALA-10973: Do not schedule empty scan nodes to coordinator

This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7bf39968bb95ac3aa66ff50b03495d4bdb97293b
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Tue Oct 19 15:00:13 2021 +0200

    IMPALA-10973: Do not schedule empty scan nodes to coordinator
    
    Until now fragments with scan nodes that have no scan ranges were
    scheduled to the coordinator, even if it is an exclusive coordinator.
    
    This could possibly lead to a lot of work to be scheduled to the
    coordinator. This patch changes the logic to choose a random executor
    instead.
    
    Change-Id: Ie31df3861aad2e3e91cab621ff122a4f721905ef
    Reviewed-on: http://gerrit.cloudera.org:8080/17954
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Abhishek Rawat <ar...@cloudera.com>
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
---
 be/src/scheduling/scheduler.cc | 28 ++++++++++++++++------------
 be/src/scheduling/scheduler.h  |  3 +--
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 1034fce..4ed5007 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -409,7 +409,20 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
   DCHECK(has_union || scan_node_ids.size() == 1) << "This method may need revisiting "
       << "for plans with no union and multiple scans per fragment";
   vector<NetworkAddressPB> scan_hosts;
-  GetScanHosts(executor_config.coord_desc, scan_node_ids, *fragment_state, &scan_hosts);
+  GetScanHosts(scan_node_ids, *fragment_state, &scan_hosts);
+  if (scan_hosts.empty()) {
+    // None of the scan nodes have any scan ranges; run it on a random executor.
+    // TODO TODO: the TODO below seems partially stale
+    // TODO: we'll need to revisit this strategy once we can partition joins
+    // (in which case this fragment might be executing a right outer join
+    // with a large build table)
+    vector<BackendDescriptorPB> all_executors =
+        executor_config.group.GetAllExecutorDescriptors();
+    int idx = std::uniform_int_distribution<int>(0, all_executors.size() - 1)(
+        *state->rng());
+    const BackendDescriptorPB& be_desc = all_executors[idx];
+    scan_hosts.push_back(be_desc.address());
+  }
   for (const NetworkAddressPB& host_addr : scan_hosts) {
     // Ensure that the num instances is at least as many as input fragments. We don't
     // want to increment if there were already some instances from the input fragment,
@@ -784,9 +797,8 @@ std::vector<TPlanNodeId> Scheduler::FindScanNodes(const TPlan& plan) {
   return FindNodes(plan, SCAN_NODE_TYPES);
 }
 
-void Scheduler::GetScanHosts(const BackendDescriptorPB& coord_desc,
-    const vector<TPlanNodeId>& scan_ids, const FragmentScheduleState& fragment_state,
-    vector<NetworkAddressPB>* scan_hosts) {
+void Scheduler::GetScanHosts(const vector<TPlanNodeId>& scan_ids,
+    const FragmentScheduleState& fragment_state, vector<NetworkAddressPB>* scan_hosts) {
   for (const TPlanNodeId& scan_id : scan_ids) {
     // Get the list of impalad host from scan_range_assignment_
     for (const FragmentScanRangeAssignment::value_type& scan_range_assignment :
@@ -796,14 +808,6 @@ void Scheduler::GetScanHosts(const BackendDescriptorPB& coord_desc,
         scan_hosts->push_back(scan_range_assignment.first);
       }
     }
-
-    if (scan_hosts->empty()) {
-      // this scan node doesn't have any scan ranges; run it on the coordinator
-      // TODO: we'll need to revisit this strategy once we can partition joins
-      // (in which case this fragment might be executing a right outer join
-      // with a large build table)
-      scan_hosts->push_back(coord_desc.address());
-    }
   }
 }
 
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index a9093cb..f75bace 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -415,8 +415,7 @@ class Scheduler {
 
   /// Add all hosts that the scans identified by 'scan_ids' are executed on to
   /// 'scan_hosts'.
-  void GetScanHosts(const BackendDescriptorPB& coord_desc,
-      const std::vector<TPlanNodeId>& scan_ids,
+  void GetScanHosts(const std::vector<TPlanNodeId>& scan_ids,
       const FragmentScheduleState& fragment_state,
       std::vector<NetworkAddressPB>* scan_hosts);