You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/25 13:52:02 UTC
[arrow-ballista] branch master updated: Reorder joins after resolving stage inputs (#441)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 660e248b Reorder joins after resolving stage inputs (#441)
660e248b is described below
commit 660e248bb553a9c9510610dcfa3ab0f722935dcb
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Tue Oct 25 15:51:57 2022 +0200
Reorder joins after resolving stage inputs (#441)
---
ballista/scheduler/src/state/execution_graph/execution_stage.rs | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index 582b34f1..c46c0a91 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -22,10 +22,12 @@ use std::iter::FromIterator;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
+use datafusion::physical_optimizer::hash_build_probe_order::HashBuildProbeOrder;
+use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
-use datafusion::prelude::SessionContext;
+use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, warn};
@@ -359,6 +361,11 @@ impl UnresolvedStage {
self.plan.clone(),
&input_locations,
)?;
+
+ // Optimize join order based on new resolved statistics
+ let optimize_join = HashBuildProbeOrder::new();
+ let plan = optimize_join.optimize(plan, &SessionConfig::new())?;
+
Ok(ResolvedStage::new(
self.stage_id,
self.stage_attempt_num,