You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/12/05 04:12:05 UTC

[airflow] branch master updated: Dagrun object doesn't exist in the TriggerDagRunOperator (#12819)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new e82cf0d  Dagrun object doesn't exist in the TriggerDagRunOperator (#12819)
e82cf0d is described below

commit e82cf0d01d6c1e1ec65d8e1b70d65158947fccd2
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Dec 4 20:10:45 2020 -0800

    Dagrun object doesn't exist in the TriggerDagRunOperator (#12819)
    
    * Dagrun object doesn't exist in the TriggerDagRunOperator
    
    fixes  https://github.com/apache/airflow/issues/12587
    
    Fixes issue where dag_run object is not populated if the dag_run already
    exists and is reset
    
    * change to get_last_dag_run
    
    * Update airflow/operators/dagrun_operator.py
    
    Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
    
    Co-authored-by: Kaxil Naik <ka...@gmail.com>
    Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
---
 airflow/operators/dagrun_operator.py    |  2 ++
 tests/operators/test_dagrun_operator.py | 30 ++++++++++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 61bcac7..63d3361 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -144,6 +144,8 @@ class TriggerDagRunOperator(BaseOperator):
                 dag = dag_bag.get_dag(self.trigger_dag_id)
 
                 dag.clear(start_date=self.execution_date, end_date=self.execution_date)
+
+                dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
             else:
                 raise e
 
diff --git a/tests/operators/test_dagrun_operator.py b/tests/operators/test_dagrun_operator.py
index fc5b2e8..bb85979 100644
--- a/tests/operators/test_dagrun_operator.py
+++ b/tests/operators/test_dagrun_operator.py
@@ -101,6 +101,36 @@ class TestDagRunOperator(TestCase):
             self.assertTrue(dagruns[0].external_trigger)
             self.assertEqual(dagruns[0].execution_date, utc_now)
 
+    def test_trigger_dagrun_twice(self):
+        """Test TriggerDagRunOperator with custom execution_date."""
+        utc_now = timezone.utcnow()
+        task = TriggerDagRunOperator(
+            task_id="test_trigger_dagrun_with_execution_date",
+            trigger_dag_id=TRIGGERED_DAG_ID,
+            execution_date=utc_now,
+            dag=self.dag,
+            poke_interval=1,
+            reset_dag_run=True,
+            wait_for_completion=True,
+        )
+        run_id = f"manual__{utc_now.isoformat()}"
+        with create_session() as session:
+            dag_run = DagRun(
+                dag_id=TRIGGERED_DAG_ID,
+                execution_date=utc_now,
+                state=State.SUCCESS,
+                run_type="manual",
+                run_id=run_id,
+            )
+            session.add(dag_run)
+            session.commit()
+            task.execute(None)
+
+            dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
+            self.assertEqual(len(dagruns), 1)
+            self.assertTrue(dagruns[0].external_trigger)
+            self.assertEqual(dagruns[0].execution_date, utc_now)
+
     def test_trigger_dagrun_with_templated_execution_date(self):
         """Test TriggerDagRunOperator with templated execution_date."""
         task = TriggerDagRunOperator(