You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ho...@apache.org on 2022/04/08 04:58:34 UTC

[airflow] branch main updated: Support conf param override for backfill runs (#22837)

This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 913eaa0e95 Support conf param override for backfill runs (#22837)
913eaa0e95 is described below

commit 913eaa0e95a4afbaeb66e533bf771fc4842c24c3
Author: QP Hou <qp...@scribd.com>
AuthorDate: Thu Apr 7 21:58:25 2022 -0700

    Support conf param override for backfill runs (#22837)
    
    Co-authored-by: Dmirty Suvorov <dm...@scribd.com>
---
 airflow/jobs/backfill_job.py    |  2 ++
 tests/jobs/test_backfill_job.py | 27 +++++++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 5d8434647d..c9f0a48b58 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -297,6 +297,8 @@ class BackfillJob(BaseJob):
             run = runs[0]
             if run.state == DagRunState.RUNNING:
                 respect_dag_max_active_limit = False
+            # Fixes --conf overwrite for backfills with already existing DagRuns
+            run.conf = self.conf or {}
         else:
             run = None
 
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index ce258e407d..f64009ebb8 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -581,6 +581,33 @@ class TestBackfillJob:
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
+    def test_backfill_override_conf(self, dag_maker):
+        dag = self._get_dummy_dag(
+            dag_maker, dag_id="test_backfill_override_conf", task_id="test_backfill_override_conf-1"
+        )
+        dr = dag_maker.create_dagrun(
+            start_date=DEFAULT_DATE,
+        )
+
+        executor = MockExecutor()
+
+        job = BackfillJob(
+            dag=dag,
+            executor=executor,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+            conf={"a": 1},
+        )
+
+        with patch.object(
+            job,
+            "_task_instances_for_dag_run",
+            wraps=job._task_instances_for_dag_run,
+        ) as wrapped_task_instances_for_dag_run:
+            job.run()
+            dr = wrapped_task_instances_for_dag_run.call_args_list[0][0][0]
+            assert dr.conf == {"a": 1}
+
     def test_backfill_rerun_failed_tasks(self, dag_maker):
         dag = self._get_dummy_dag(
             dag_maker, dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1"