You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/07/15 01:30:09 UTC
[airflow] branch main updated: Explicitly list @dag arguments (#25044)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new be63c36bf1 Explicitly list @dag arguments (#25044)
be63c36bf1 is described below
commit be63c36bf1667c8a420d34e70e5a5efd7ca42815
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Fri Jul 15 09:29:57 2022 +0800
Explicitly list @dag arguments (#25044)
---
airflow/example_dags/example_dag_decorator.py | 2 +-
airflow/example_dags/example_sla_dag.py | 2 +-
airflow/models/dag.py | 86 ++++++++++++++++++++++-----
docs/spelling_wordlist.txt | 4 ++
4 files changed, 78 insertions(+), 16 deletions(-)
diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py
index 88e0282016..a47766f8a7 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -67,5 +67,5 @@ def example_dag_decorator(email: str = 'example@example.com'):
)
-dag = example_dag_decorator()
+example_dag = example_dag_decorator()
# [END dag_decorator_usage]
diff --git a/airflow/example_dags/example_sla_dag.py b/airflow/example_dags/example_sla_dag.py
index 0db6bc1ba7..985b2549d5 100644
--- a/airflow/example_dags/example_sla_dag.py
+++ b/airflow/example_dags/example_sla_dag.py
@@ -60,6 +60,6 @@ def example_sla_dag():
sleep_20() >> sleep_30()
-dag = example_sla_dag()
+example_dag = example_sla_dag()
# [END howto_task_sla]
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 48da0a8b77..9a26e61294 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -112,6 +112,8 @@ ScheduleInterval = Union[None, str, timedelta, relativedelta]
# See also: https://discuss.python.org/t/9126/7
ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval]
+SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None]
+
# Backward compatibility: If neither schedule_interval nor timetable is
# *provided by the user*, default to a one-day interval.
@@ -308,6 +310,8 @@ class DAG(LoggingMixin):
parent_dag: Optional["DAG"] = None # Gets set when DAGs are loaded
+ # NOTE: When updating arguments here, please also keep arguments in @dag()
+ # below in sync. (Search for 'def dag(' in this file.)
def __init__(
self,
dag_id: str,
@@ -326,9 +330,7 @@ class DAG(LoggingMixin):
max_active_tasks: int = conf.getint('core', 'max_active_tasks_per_dag'),
max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'),
dagrun_timeout: Optional[timedelta] = None,
- sla_miss_callback: Optional[
- Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None]
- ] = None,
+ sla_miss_callback: Optional[SLAMissCallback] = None,
default_view: str = conf.get_mandatory_value('webserver', 'dag_default_view').lower(),
orientation: str = conf.get_mandatory_value('webserver', 'dag_orientation'),
catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'),
@@ -3060,7 +3062,40 @@ class DagModel(Base):
)
-def dag(*dag_args, **dag_kwargs):
+# NOTE: Please keep the list of arguments in sync with DAG.__init__.
+# Only exception: dag_id here should have a default value, but not in DAG.
+def dag(
+ dag_id: str = "",
+ description: Optional[str] = None,
+ schedule_interval: ScheduleIntervalArg = NOTSET,
+ timetable: Optional[Timetable] = None,
+ start_date: Optional[datetime] = None,
+ end_date: Optional[datetime] = None,
+ full_filepath: Optional[str] = None,
+ template_searchpath: Optional[Union[str, Iterable[str]]] = None,
+ template_undefined: Type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
+ user_defined_macros: Optional[Dict] = None,
+ user_defined_filters: Optional[Dict] = None,
+ default_args: Optional[Dict] = None,
+ concurrency: Optional[int] = None,
+ max_active_tasks: int = conf.getint('core', 'max_active_tasks_per_dag'),
+ max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'),
+ dagrun_timeout: Optional[timedelta] = None,
+ sla_miss_callback: Optional[SLAMissCallback] = None,
+ default_view: str = conf.get_mandatory_value('webserver', 'dag_default_view').lower(),
+ orientation: str = conf.get_mandatory_value('webserver', 'dag_orientation'),
+ catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'),
+ on_success_callback: Optional[DagStateChangeCallback] = None,
+ on_failure_callback: Optional[DagStateChangeCallback] = None,
+ doc_md: Optional[str] = None,
+ params: Optional[Dict] = None,
+ access_control: Optional[Dict] = None,
+ is_paused_upon_creation: Optional[bool] = None,
+ jinja_environment_kwargs: Optional[Dict] = None,
+ render_template_as_native_obj: bool = False,
+ tags: Optional[List[str]] = None,
+ schedule_on: Optional[Sequence["Dataset"]] = None,
+) -> Callable[[Callable], Callable[..., DAG]]:
"""
Python dag decorator. Wraps a function into an Airflow DAG.
Accepts kwargs for operator kwarg. Can be used to parameterize DAGs.
@@ -3069,11 +3104,7 @@ def dag(*dag_args, **dag_kwargs):
:param dag_kwargs: Kwargs for DAG object.
"""
- def wrapper(f: Callable):
- # Get dag initializer signature and bind it to validate that dag_args, and dag_kwargs are correct
- dag_sig = signature(DAG.__init__)
- dag_bound_args = dag_sig.bind_partial(*dag_args, **dag_kwargs)
-
+ def wrapper(f: Callable) -> Callable[..., DAG]:
@functools.wraps(f)
def factory(*args, **kwargs):
# Generate signature for decorated function and bind the arguments when called
@@ -3083,12 +3114,39 @@ def dag(*dag_args, **dag_kwargs):
# Apply defaults to capture default values if set.
f_sig.apply_defaults()
- # Set function name as dag_id if not set
- dag_id = dag_bound_args.arguments.get('dag_id', f.__name__)
- dag_bound_args.arguments['dag_id'] = dag_id
-
# Initialize DAG with bound arguments
- with DAG(*dag_bound_args.args, **dag_bound_args.kwargs) as dag_obj:
+ with DAG(
+ dag_id or f.__name__,
+ description=description,
+ schedule_interval=schedule_interval,
+ timetable=timetable,
+ start_date=start_date,
+ end_date=end_date,
+ full_filepath=full_filepath,
+ template_searchpath=template_searchpath,
+ template_undefined=template_undefined,
+ user_defined_macros=user_defined_macros,
+ user_defined_filters=user_defined_filters,
+ default_args=default_args,
+ concurrency=concurrency,
+ max_active_tasks=max_active_tasks,
+ max_active_runs=max_active_runs,
+ dagrun_timeout=dagrun_timeout,
+ sla_miss_callback=sla_miss_callback,
+ default_view=default_view,
+ orientation=orientation,
+ catchup=catchup,
+ on_success_callback=on_success_callback,
+ on_failure_callback=on_failure_callback,
+ doc_md=doc_md,
+ params=params,
+ access_control=access_control,
+ is_paused_upon_creation=is_paused_upon_creation,
+ jinja_environment_kwargs=jinja_environment_kwargs,
+ render_template_as_native_obj=render_template_as_native_obj,
+ tags=tags,
+ schedule_on=schedule_on,
+ ) as dag_obj:
# Set DAG documentation from function documentation.
if f.__doc__:
dag_obj.doc_md = f.__doc__
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 9f7be831d9..5dd5d794a0 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -384,6 +384,7 @@ StatefulSets
StatsD
StorageClass
StoredInfoType
+StrictUndefined
Stringified
Subclasses
Subdirectory
@@ -864,8 +865,10 @@ gdbm
generateUploadUrl
geq
getattr
+getboolean
getfqdn
getframe
+getint
getsource
gevent
gid
@@ -1323,6 +1326,7 @@ schedulable
schedulername
schemas
sdk
+searchpath
secretRef
secretRefs
securable