You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/11 23:15:54 UTC

[airflow] branch v2-5-test updated: Add back join to zombie query that was dropped in #28198 (#28544)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-5-test by this push:
     new 0902436028 Add back join to zombie query that was dropped in #28198 (#28544)
0902436028 is described below

commit 09024360285c2d3945ef3d4964b871141471e4a6
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Dec 23 09:24:30 2022 -0600

    Add back join to zombie query that was dropped in #28198 (#28544)
    
    #28198 accidentally dropped a join in a query, leading to this:
    
        airflow/jobs/scheduler_job.py:1547 SAWarning: SELECT statement has a
        cartesian product between FROM element(s) "dag_run_1", "task_instance",
        "job" and FROM element "dag". Apply join condition(s) between each element to resolve.
    
    (cherry picked from commit a24d18a534ddbcefbcf0d8790d140ff496781f8b)
---
 airflow/jobs/scheduler_job.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index baeabdc2ec..b8b608efcd 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1525,7 +1525,8 @@ class SchedulerJob(BaseJob):
             zombies: list[tuple[TI, str, str]] = (
                 session.query(TI, DM.fileloc, DM.processor_subdir)
                 .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
-                .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
+                .join(LocalTaskJob, TI.job_id == LocalTaskJob.id)
+                .join(DM, TI.dag_id == DM.dag_id)
                 .filter(TI.state == TaskInstanceState.RUNNING)
                 .filter(
                     or_(