You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/11 11:00:19 UTC
[airflow] 01/05: Do not let create_dagrun overwrite explicit run_id
(#17728)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a3728cf83ddee9e1458fea9c706aa55916d42f2b
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Thu Aug 19 22:33:09 2021 +0800
Do not let create_dagrun overwrite explicit run_id (#17728)
Previous DAG.create_dagrun() has an weird behavior that when *all* of
run_id, execution_date, and run_type are provided, the function would
ignore the run_id argument and overwrite it by auto-generating a run_id
with DagRun.generate_run_id(). This fix the logic to respect the
explicit run_id value.
I don't think any of the "Airflow proper" code would be affected by
this, but the dag_maker fixture used in the test suite needs to be
tweaked a bit to continue working.
(cherry picked from commit 50771e0f66803d0a0a0b552ab77f4e6be7d1088b)
---
airflow/models/dag.py | 9 +++++----
tests/conftest.py | 17 ++++++++++-------
2 files changed, 15 insertions(+), 11 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 4ac2ace..a1419fe 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1767,15 +1767,16 @@ class DAG(LoggingMixin):
:param dag_hash: Hash of Serialized DAG
:type dag_hash: str
"""
- if run_id and not run_type:
+ if run_id: # Infer run_type from run_id if needed.
if not isinstance(run_id, str):
raise ValueError(f"`run_id` expected to be a str is {type(run_id)}")
- run_type: DagRunType = DagRunType.from_run_id(run_id)
- elif run_type and execution_date:
+ if not run_type:
+ run_type = DagRunType.from_run_id(run_id)
+ elif run_type and execution_date is not None: # Generate run_id from run_type and execution_date.
if not isinstance(run_type, DagRunType):
raise ValueError(f"`run_type` expected to be a DagRunType is {type(run_type)}")
run_id = DagRun.generate_run_id(run_type, execution_date)
- elif not run_id:
+ else:
raise AirflowException(
"Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
)
diff --git a/tests/conftest.py b/tests/conftest.py
index 0873ac4..3d053cf 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -459,13 +459,16 @@ def dag_maker(request):
def create_dagrun(self, **kwargs):
dag = self.dag
- defaults = dict(
- run_id='test',
- state=State.RUNNING,
- execution_date=self.start_date,
- start_date=self.start_date,
- )
- kwargs = {**defaults, **kwargs}
+ kwargs = {
+ "state": State.RUNNING,
+ "execution_date": self.start_date,
+ "start_date": self.start_date,
+ **kwargs,
+ }
+ # Need to provide run_id if the user does not either provide one
+ # explicitly, or pass run_type for inference in dag.create_dagrun().
+ if "run_id" not in kwargs and "run_type" not in kwargs:
+ kwargs["run_id"] = "test"
self.dag_run = dag.create_dagrun(**kwargs)
return self.dag_run