You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/29 16:25:06 UTC
[airflow] branch master updated: Only allow passing JSON
Serializable conf to TriggerDagRunOperator (#13964)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new b4885b2 Only allow passing JSON Serializable conf to TriggerDagRunOperator (#13964)
b4885b2 is described below
commit b4885b25871ae7ede2028f81b0d88def3e22f23a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jan 29 16:24:46 2021 +0000
Only allow passing JSON Serializable conf to TriggerDagRunOperator (#13964)
closes https://github.com/apache/airflow/issues/13414
---
airflow/operators/trigger_dagrun.py | 6 ++++++
tests/operators/test_trigger_dagrun.py | 11 +++++++++++
2 files changed, 17 insertions(+)
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 63d3361..3cf7a4f 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -17,6 +17,7 @@
# under the License.
import datetime
+import json
import time
from typing import Dict, List, Optional, Union
@@ -108,6 +109,11 @@ class TriggerDagRunOperator(BaseOperator):
self.execution_date: Optional[datetime.datetime] = execution_date # type: ignore
+ try:
+ json.dumps(self.conf)
+ except TypeError:
+ raise AirflowException("conf parameter should be JSON Serializable")
+
def execute(self, context: Dict):
if isinstance(self.execution_date, datetime.datetime):
execution_date = self.execution_date
diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py
index c17d43c..1bdc59b 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -164,6 +164,17 @@ class TestDagRunOperator(TestCase):
assert len(dagruns) == 1
assert dagruns[0].conf, {"foo": "bar"}
+ def test_trigger_dagrun_operator_templated_invalid_conf(self):
+ """Test passing a conf that is not JSON Serializable raise error."""
+
+ with pytest.raises(AirflowException, match="^conf parameter should be JSON Serializable$"):
+ TriggerDagRunOperator(
+ task_id="test_trigger_dagrun_with_invalid_conf",
+ trigger_dag_id=TRIGGERED_DAG_ID,
+ conf={"foo": "{{ dag.dag_id }}", "datetime": timezone.utcnow()},
+ dag=self.dag,
+ )
+
def test_trigger_dagrun_operator_templated_conf(self):
"""Test passing a templated conf to the triggered DagRun."""
task = TriggerDagRunOperator(