You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/25 13:52:02 UTC

[arrow-ballista] branch master updated: Reorder joins after resolving stage inputs (#441)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 660e248b Reorder joins after resolving stage inputs (#441)
660e248b is described below

commit 660e248bb553a9c9510610dcfa3ab0f722935dcb
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Tue Oct 25 15:51:57 2022 +0200

    Reorder joins after resolving stage inputs (#441)
---
 ballista/scheduler/src/state/execution_graph/execution_stage.rs | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index 582b34f1..c46c0a91 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,10 +22,12 @@ use std::iter::FromIterator;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
+use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
 use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
-use datafusion::prelude::SessionContext;
+use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::{debug, warn};
 
@@ -359,6 +361,11 @@ impl UnresolvedStage {
             self.plan.clone(),
             &input_locations,
         )?;
+
+        // Optimize join order based on new resolved statistics
+        let optimize_join = HashBuildProbeOrder::new();
+        let plan = optimize_join.optimize(plan, &SessionConfig::new())?;
+
         Ok(ResolvedStage::new(
             self.stage_id,
             self.stage_attempt_num,