You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/12/05 04:12:05 UTC
[airflow] branch master updated: Dagrun object doesn't exist in the
TriggerDagRunOperator (#12819)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new e82cf0d Dagrun object doesn't exist in the TriggerDagRunOperator (#12819)
e82cf0d is described below
commit e82cf0d01d6c1e1ec65d8e1b70d65158947fccd2
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Dec 4 20:10:45 2020 -0800
Dagrun object doesn't exist in the TriggerDagRunOperator (#12819)
* Dagrun object doesn't exist in the TriggerDagRunOperator
fixes https://github.com/apache/airflow/issues/12587
Fixes issue where dag_run object is not populated if the dag_run already
exists and is reset
* change to get_last_dag_run
* Update airflow/operators/dagrun_operator.py
Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
Co-authored-by: Kaxil Naik <ka...@gmail.com>
Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
---
airflow/operators/dagrun_operator.py | 2 ++
tests/operators/test_dagrun_operator.py | 30 ++++++++++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 61bcac7..63d3361 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -144,6 +144,8 @@ class TriggerDagRunOperator(BaseOperator):
dag = dag_bag.get_dag(self.trigger_dag_id)
dag.clear(start_date=self.execution_date, end_date=self.execution_date)
+
+ dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
else:
raise e
diff --git a/tests/operators/test_dagrun_operator.py b/tests/operators/test_dagrun_operator.py
index fc5b2e8..bb85979 100644
--- a/tests/operators/test_dagrun_operator.py
+++ b/tests/operators/test_dagrun_operator.py
@@ -101,6 +101,36 @@ class TestDagRunOperator(TestCase):
self.assertTrue(dagruns[0].external_trigger)
self.assertEqual(dagruns[0].execution_date, utc_now)
+ def test_trigger_dagrun_twice(self):
+ """Test TriggerDagRunOperator with custom execution_date."""
+ utc_now = timezone.utcnow()
+ task = TriggerDagRunOperator(
+ task_id="test_trigger_dagrun_with_execution_date",
+ trigger_dag_id=TRIGGERED_DAG_ID,
+ execution_date=utc_now,
+ dag=self.dag,
+ poke_interval=1,
+ reset_dag_run=True,
+ wait_for_completion=True,
+ )
+ run_id = f"manual__{utc_now.isoformat()}"
+ with create_session() as session:
+ dag_run = DagRun(
+ dag_id=TRIGGERED_DAG_ID,
+ execution_date=utc_now,
+ state=State.SUCCESS,
+ run_type="manual",
+ run_id=run_id,
+ )
+ session.add(dag_run)
+ session.commit()
+ task.execute(None)
+
+ dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
+ self.assertEqual(len(dagruns), 1)
+ self.assertTrue(dagruns[0].external_trigger)
+ self.assertEqual(dagruns[0].execution_date, utc_now)
+
def test_trigger_dagrun_with_templated_execution_date(self):
"""Test TriggerDagRunOperator with templated execution_date."""
task = TriggerDagRunOperator(