You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/11 11:00:19 UTC

[airflow] 01/05: Do not let create_dagrun overwrite explicit run_id (#17728)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a3728cf83ddee9e1458fea9c706aa55916d42f2b
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Thu Aug 19 22:33:09 2021 +0800

    Do not let create_dagrun overwrite explicit run_id (#17728)
    
    Previous DAG.create_dagrun() has an weird behavior that when *all* of
    run_id, execution_date, and run_type are provided, the function would
    ignore the run_id argument and overwrite it by auto-generating a run_id
    with DagRun.generate_run_id(). This fix the logic to respect the
    explicit run_id value.
    
    I don't think any of the "Airflow proper" code would be affected by
    this, but the dag_maker fixture used in the test suite needs to be
    tweaked a bit to continue working.
    
    (cherry picked from commit 50771e0f66803d0a0a0b552ab77f4e6be7d1088b)
---
 airflow/models/dag.py |  9 +++++----
 tests/conftest.py     | 17 ++++++++++-------
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 4ac2ace..a1419fe 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1767,15 +1767,16 @@ class DAG(LoggingMixin):
         :param dag_hash: Hash of Serialized DAG
         :type dag_hash: str
         """
-        if run_id and not run_type:
+        if run_id:  # Infer run_type from run_id if needed.
             if not isinstance(run_id, str):
                 raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
-            run_type: DagRunType = DagRunType.from_run_id(run_id)
-        elif run_type and execution_date:
+            if not run_type:
+                run_type = DagRunType.from_run_id(run_id)
+        elif run_type and execution_date is not None:  # Generate run_id from run_type and execution_date.
             if not isinstance(run_type, DagRunType):
                 raise ValueError(f"`run_type` expected to be a DagRunType is {type(run_type)}")
             run_id = DagRun.generate_run_id(run_type, execution_date)
-        elif not run_id:
+        else:
             raise AirflowException(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
diff --git a/tests/conftest.py b/tests/conftest.py
index 0873ac4..3d053cf 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -459,13 +459,16 @@ def dag_maker(request):
 
         def create_dagrun(self, **kwargs):
             dag = self.dag
-            defaults = dict(
-                run_id='test',
-                state=State.RUNNING,
-                execution_date=self.start_date,
-                start_date=self.start_date,
-            )
-            kwargs = {**defaults, **kwargs}
+            kwargs = {
+                "state": State.RUNNING,
+                "execution_date": self.start_date,
+                "start_date": self.start_date,
+                **kwargs,
+            }
+            # Need to provide run_id if the user does not either provide one
+            # explicitly, or pass run_type for inference in dag.create_dagrun().
+            if "run_id" not in kwargs and "run_type" not in kwargs:
+                kwargs["run_id"] = "test"
             self.dag_run = dag.create_dagrun(**kwargs)
             return self.dag_run