You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/08 20:30:06 UTC

[airflow] branch v2-2-test updated (b92cf65 -> f0a8dac)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from b92cf65  Add possibility to create user in the Remote User mode (#19963)
     new 0823fd2  Fix TriggerDagRunOperator extra link (#19410)
     new f0a8dac  Fix mismatch in generated run_id and logical date of DAG run (#18707)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/operators/trigger_dagrun.py    | 39 +++++++++++++++-------
 tests/operators/test_trigger_dagrun.py | 60 +++++++++++++++++++++++++---------
 2 files changed, 71 insertions(+), 28 deletions(-)

[airflow] 01/02: Fix TriggerDagRunOperator extra link (#19410)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0823fd2fbf38ebe76cb83409f4aca20f5f710abb
Author: Niko <on...@amazon.com>
AuthorDate: Thu Dec 9 05:46:59 2021 -0800

    Fix TriggerDagRunOperator extra link (#19410)
    
    The extra link provided by the operator was previously using the
    execution date of the triggering dag, not the triggered dag. Store the
    execution date of the triggered dag in xcom so that it can be read back
    later within the webserver when the link is being created.
    
    (cherry picked from commit 820e836c4a2e45239279d4d71e1db9434022fec5)
---
 airflow/operators/trigger_dagrun.py    | 19 ++++++++++++-
 tests/operators/test_trigger_dagrun.py | 49 +++++++++++++++++++++++++++-------
 2 files changed, 57 insertions(+), 11 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 421c796..a346db1 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -24,11 +24,15 @@ from typing import Dict, List, Optional, Union
 from airflow.api.common.trigger_dag import trigger_dag
 from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
 from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
+from airflow.models.xcom import XCom
 from airflow.utils import timezone
 from airflow.utils.helpers import build_airflow_url_with_query
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 
+XCOM_EXECUTION_DATE_ISO = "trigger_execution_date_iso"
+XCOM_RUN_ID = "trigger_run_id"
+
 
 class TriggerDagRunLink(BaseOperatorLink):
     """
@@ -39,7 +43,13 @@ class TriggerDagRunLink(BaseOperatorLink):
     name = 'Triggered DAG'
 
     def get_link(self, operator, dttm):
-        query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
+        # Fetch the correct execution date for the triggerED dag which is
+        # stored in xcom during execution of the triggerING task.
+        trigger_execution_date_iso = XCom.get_one(
+            execution_date=dttm, key=XCOM_EXECUTION_DATE_ISO, task_id=operator.task_id, dag_id=operator.dag_id
+        )
+
+        query = {"dag_id": operator.trigger_dag_id, "base_date": trigger_execution_date_iso}
         return build_airflow_url_with_query(query)
 
 
@@ -140,6 +150,7 @@ class TriggerDagRunOperator(BaseOperator):
                 execution_date=self.execution_date,
                 replace_microseconds=False,
             )
+
         except DagRunAlreadyExists as e:
             if self.reset_dag_run:
                 self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
@@ -157,6 +168,12 @@ class TriggerDagRunOperator(BaseOperator):
             else:
                 raise e
 
+        # Store the execution date from the dag run (either created or found above) to
+        # be used when creating the extra link on the webserver.
+        ti = context['task_instance']
+        ti.xcom_push(key=XCOM_EXECUTION_DATE_ISO, value=dag_run.execution_date.isoformat())
+        ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
+
         if self.wait_for_completion:
             # wait for dag to complete
             while True:
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 9ff8735..1934c4d 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -19,7 +19,7 @@
 import pathlib
 import tempfile
 from datetime import datetime
-from unittest import TestCase
+from unittest import TestCase, mock
 
 import pytest
 
@@ -76,6 +76,25 @@ class TestDagRunOperator(TestCase):
 
         pathlib.Path(self._tmpfile).unlink()
 
+    @mock.patch('airflow.operators.trigger_dagrun.build_airflow_url_with_query')
+    def assert_extra_link(self, triggering_exec_date, triggered_dag_run, triggering_task, mock_build_url):
+        """
+        Asserts whether the correct extra links url will be created.
+
+        Specifically it tests whether the correct dag id and date are passed to
+        the method which constructs the final url.
+        Note: We can't run that method to generate the url itself because the Flask app context
+        isn't available within the test logic, so it is mocked here.
+        """
+        triggering_task.get_extra_links(triggering_exec_date, 'Triggered DAG')
+        assert mock_build_url.called
+        args, _ = mock_build_url.call_args
+        expected_args = {
+            'dag_id': triggered_dag_run.dag_id,
+            'base_date': triggered_dag_run.execution_date.isoformat(),
+        }
+        assert expected_args in args
+
     def test_trigger_dagrun(self):
         """Test TriggerDagRunOperator."""
         task = TriggerDagRunOperator(task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, dag=self.dag)
@@ -84,7 +103,9 @@ class TestDagRunOperator(TestCase):
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_custom_run_id(self):
         task = TriggerDagRunOperator(
@@ -114,8 +135,10 @@ class TestDagRunOperator(TestCase):
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
-            assert dagruns[0].execution_date == utc_now
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.execution_date == utc_now
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_twice(self):
         """Test TriggerDagRunOperator with custom execution_date."""
@@ -140,12 +163,14 @@ class TestDagRunOperator(TestCase):
             )
             session.add(dag_run)
             session.commit()
-            task.execute(None)
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
-            assert dagruns[0].execution_date == utc_now
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.execution_date == utc_now
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_with_templated_execution_date(self):
         """Test TriggerDagRunOperator with templated execution_date."""
@@ -160,8 +185,10 @@ class TestDagRunOperator(TestCase):
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
-            assert dagruns[0].external_trigger
-            assert dagruns[0].execution_date == DEFAULT_DATE
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.execution_date == DEFAULT_DATE
+            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
 
     def test_trigger_dagrun_operator_conf(self):
         """Test passing conf to the triggered DagRun."""
@@ -288,7 +315,9 @@ class TestDagRunOperator(TestCase):
                 .all()
             )
             assert len(dagruns) == 2
-            assert dagruns[1].state == State.QUEUED
+            triggered_dag_run = dagruns[1]
+            assert triggered_dag_run.state == State.QUEUED
+            self.assert_extra_link(execution_date, triggered_dag_run, task)
 
     def test_trigger_dagrun_triggering_itself_with_execution_date(self):
         """Test TriggerDagRunOperator that triggers itself with execution date,

[airflow] 02/02: Fix mismatch in generated run_id and logical date of DAG run (#18707)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f0a8dacb97d0e7e89f5554904427ec3c076eeec4
Author: David Caron <dc...@gmail.com>
AuthorDate: Thu Feb 3 21:14:19 2022 -0500

    Fix mismatch in generated run_id and logical date of DAG run (#18707)
    
    Co-authored-by: Tzu-ping Chung <tp...@astronomer.io>
    Co-authored-by: Jed Cunningham <je...@apache.org>
    (cherry picked from commit 1f08d281632670aef1de8dfc62c9f63aeec18760)
---
 airflow/operators/trigger_dagrun.py    | 20 +++++++++-----------
 tests/operators/test_trigger_dagrun.py | 25 ++++++++++++-------------
 2 files changed, 21 insertions(+), 24 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index a346db1..7dae196 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -115,13 +115,13 @@ class TriggerDagRunOperator(BaseOperator):
         self.allowed_states = allowed_states or [State.SUCCESS]
         self.failed_states = failed_states or [State.FAILED]
 
-        if not isinstance(execution_date, (str, datetime.datetime, type(None))):
+        if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)):
             raise TypeError(
                 "Expected str or datetime.datetime type for execution_date."
                 "Got {}".format(type(execution_date))
             )
 
-        self.execution_date: Optional[datetime.datetime] = execution_date  # type: ignore
+        self.execution_date = execution_date
 
         try:
             json.dumps(self.conf)
@@ -130,30 +130,28 @@ class TriggerDagRunOperator(BaseOperator):
 
     def execute(self, context: Dict):
         if isinstance(self.execution_date, datetime.datetime):
-            execution_date = self.execution_date
+            parsed_execution_date = self.execution_date
         elif isinstance(self.execution_date, str):
-            execution_date = timezone.parse(self.execution_date)
-            self.execution_date = execution_date
+            parsed_execution_date = timezone.parse(self.execution_date)
         else:
-            execution_date = timezone.utcnow()
+            parsed_execution_date = timezone.utcnow()
 
         if self.trigger_run_id:
             run_id = self.trigger_run_id
         else:
-            run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-
+            run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
         try:
             dag_run = trigger_dag(
                 dag_id=self.trigger_dag_id,
                 run_id=run_id,
                 conf=self.conf,
-                execution_date=self.execution_date,
+                execution_date=parsed_execution_date,
                 replace_microseconds=False,
             )
 
         except DagRunAlreadyExists as e:
             if self.reset_dag_run:
-                self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
+                self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_execution_date)
 
                 # Get target dag object and call clear()
 
@@ -163,7 +161,7 @@ class TriggerDagRunOperator(BaseOperator):
 
                 dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
                 dag = dag_bag.get_dag(self.trigger_dag_id)
-                dag.clear(start_date=self.execution_date, end_date=self.execution_date)
+                dag.clear(start_date=parsed_execution_date, end_date=parsed_execution_date)
                 dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
             else:
                 raise e
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index 1934c4d..180781e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -30,6 +30,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
+from airflow.utils.types import DagRunType
 
 DEFAULT_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc)
 TEST_DAG_ID = "testdag"
@@ -101,11 +102,10 @@ class TestDagRunOperator(TestCase):
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         with create_session() as session:
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+            dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+            assert dagrun.external_trigger
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.execution_date)
+            self.assert_extra_link(DEFAULT_DATE, dagrun, task)
 
     def test_trigger_dagrun_custom_run_id(self):
         task = TriggerDagRunOperator(
@@ -123,22 +123,21 @@ class TestDagRunOperator(TestCase):
 
     def test_trigger_dagrun_with_execution_date(self):
         """Test TriggerDagRunOperator with custom execution_date."""
-        utc_now = timezone.utcnow()
+        custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
         task = TriggerDagRunOperator(
             task_id="test_trigger_dagrun_with_execution_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=utc_now,
+            execution_date=custom_execution_date,
             dag=self.dag,
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         with create_session() as session:
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.execution_date == utc_now
-            self.assert_extra_link(DEFAULT_DATE, triggered_dag_run, task)
+            dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
+            assert dagrun.external_trigger
+            assert dagrun.execution_date == custom_execution_date
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date)
+            self.assert_extra_link(DEFAULT_DATE, dagrun, task)
 
     def test_trigger_dagrun_twice(self):
         """Test TriggerDagRunOperator with custom execution_date."""