You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/02/04 02:15:01 UTC

[airflow] branch main updated: Fix mismatch in generated run_id and logical date of DAG run (#18707)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f08d28  Fix mismatch in generated run_id and logical date of DAG run (#18707)
1f08d28 is described below

commit 1f08d281632670aef1de8dfc62c9f63aeec18760
Author: David Caron <dc...@gmail.com>
AuthorDate: Thu Feb 3 21:14:19 2022 -0500

    Fix mismatch in generated run_id and logical date of DAG run (#18707)
    
    Co-authored-by: Tzu-ping Chung <tp...@astronomer.io>
    Co-authored-by: Jed Cunningham <je...@apache.org>
---
 airflow/operators/trigger_dagrun.py    | 20 +++++++++-----------
 tests/operators/test_trigger_dagrun.py | 25 ++++++++++++-------------
 2 files changed, 21 insertions(+), 24 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 3306dde..526b04c 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -107,12 +107,12 @@ class TriggerDagRunOperator(BaseOperator):
         self.allowed_states = allowed_states or [State.SUCCESS]
         self.failed_states = failed_states or [State.FAILED]
 
-        if not isinstance(execution_date, (str, datetime.datetime, type(None))):
+        if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)):
             raise TypeError(
                 f"Expected str or datetime.datetime type for execution_date.Got {type(execution_date)}"
             )
 
-        self.execution_date: Optional[datetime.datetime] = execution_date  # type: ignore
+        self.execution_date = execution_date
 
         try:
             json.dumps(self.conf)
@@ -121,30 +121,28 @@ class TriggerDagRunOperator(BaseOperator):
 
     def execute(self, context: Context):
         if isinstance(self.execution_date, datetime.datetime):
-            execution_date = self.execution_date
+            parsed_execution_date = self.execution_date
         elif isinstance(self.execution_date, str):
-            execution_date = timezone.parse(self.execution_date)
-            self.execution_date = execution_date
+            parsed_execution_date = timezone.parse(self.execution_date)
         else:
-            execution_date = timezone.utcnow()
+            parsed_execution_date = timezone.utcnow()
 
         if self.trigger_run_id:
             run_id = self.trigger_run_id
         else:
-            run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-
+            run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
         try:
             dag_run = trigger_dag(
                 dag_id=self.trigger_dag_id,
                 run_id=run_id,
                 conf=self.conf,
-                execution_date=self.execution_date,
+                execution_date=parsed_execution_date,
                 replace_microseconds=False,
             )
 
         except DagRunAlreadyExists as e:
             if self.reset_dag_run:
-                self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
+                self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_execution_date)
 
                 # Get target dag object and call clear()
 
@@ -154,7 +152,7 @@ class TriggerDagRunOperator(BaseOperator):
 
                 dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
                 dag = dag_bag.get_dag(self.trigger_dag_id)
-                dag.clear(start_date=self.execution_date, end_date=self.execution_date)
+                dag.clear(start_date=parsed_execution_date, end_date=parsed_execution_date)
                 dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
             else:
                 raise e
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 1934c4d..180781e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -30,6 +30,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
+from airflow.utils.types import DagRunType
 
 DEFAULT_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc)
 TEST_DAG_ID = "testdag"
@@ -101,11 +102,10 @@ class TestDagRunOperator(TestCase):
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         with create_session() as session:
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+            dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+            assert dagrun.external_trigger
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.execution_date)
+            self.assert_extra_link(DEFAULT_DATE, dagrun, task)
 
     def test_trigger_dagrun_custom_run_id(self):
         task = TriggerDagRunOperator(
@@ -123,22 +123,21 @@ class TestDagRunOperator(TestCase):
 
     def test_trigger_dagrun_with_execution_date(self):
         """Test TriggerDagRunOperator with custom execution_date."""
-        utc_now = timezone.utcnow()
+        custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
         task = TriggerDagRunOperator(
             task_id="test_trigger_dagrun_with_execution_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=utc_now,
+            execution_date=custom_execution_date,
             dag=self.dag,
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         with create_session() as session:
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.execution_date == utc_now
-            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+            dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+            assert dagrun.external_trigger
+            assert dagrun.execution_date == custom_execution_date
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date)
+            self.assert_extra_link(DEFAULT_DATE, dagrun, task)
 
     def test_trigger_dagrun_twice(self):
         """Test TriggerDagRunOperator with custom execution_date."""