You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/07/15 01:30:09 UTC

[airflow] branch main updated: Explicitly list @dag arguments (#25044)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new be63c36bf1 Explicitly list @dag arguments (#25044)
be63c36bf1 is described below

commit be63c36bf1667c8a420d34e70e5a5efd7ca42815
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Fri Jul 15 09:29:57 2022 +0800

    Explicitly list @dag arguments (#25044)
---
 airflow/example_dags/example_dag_decorator.py |  2 +-
 airflow/example_dags/example_sla_dag.py       |  2 +-
 airflow/models/dag.py                         | 86 ++++++++++++++++++++++-----
 docs/spelling_wordlist.txt                    |  4 ++
 4 files changed, 78 insertions(+), 16 deletions(-)

diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py
index 88e0282016..a47766f8a7 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -67,5 +67,5 @@ def example_dag_decorator(email: str = 'example@example.com'):
     )
 
 
-dag = example_dag_decorator()
+example_dag = example_dag_decorator()
 # [END dag_decorator_usage]
diff --git a/airflow/example_dags/example_sla_dag.py b/airflow/example_dags/example_sla_dag.py
index 0db6bc1ba7..985b2549d5 100644
--- a/airflow/example_dags/example_sla_dag.py
+++ b/airflow/example_dags/example_sla_dag.py
@@ -60,6 +60,6 @@ def example_sla_dag():
     sleep_20() >> sleep_30()
 
 
-dag = example_sla_dag()
+example_dag = example_sla_dag()
 
 # [END howto_task_sla]
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 48da0a8b77..9a26e61294 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -112,6 +112,8 @@ ScheduleInterval = Union[None, str, timedelta, relativedelta]
 # See also: https://discuss.python.org/t/9126/7
 ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval]
 
+SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None]
+
 
 # Backward compatibility: If neither schedule_interval nor timetable is
 # *provided by the user*, default to a one-day interval.
@@ -308,6 +310,8 @@ class DAG(LoggingMixin):
 
     parent_dag: Optional["DAG"] = None  # Gets set when DAGs are loaded
 
+    # NOTE: When updating arguments here, please also keep arguments in @dag()
+    # below in sync. (Search for 'def dag(' in this file.)
     def __init__(
         self,
         dag_id: str,
@@ -326,9 +330,7 @@ class DAG(LoggingMixin):
         max_active_tasks: int = conf.getint('core', 'max_active_tasks_per_dag'),
         max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'),
         dagrun_timeout: Optional[timedelta] = None,
-        sla_miss_callback: Optional[
-            Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None]
-        ] = None,
+        sla_miss_callback: Optional[SLAMissCallback] = None,
         default_view: str = conf.get_mandatory_value('webserver', 'dag_default_view').lower(),
         orientation: str = conf.get_mandatory_value('webserver', 'dag_orientation'),
         catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'),
@@ -3060,7 +3062,40 @@ class DagModel(Base):
         )
 
 
-def dag(*dag_args, **dag_kwargs):
+# NOTE: Please keep the list of arguments in sync with DAG.__init__.
+# Only exception: dag_id here should have a default value, but not in DAG.
+def dag(
+    dag_id: str = "",
+    description: Optional[str] = None,
+    schedule_interval: ScheduleIntervalArg = NOTSET,
+    timetable: Optional[Timetable] = None,
+    start_date: Optional[datetime] = None,
+    end_date: Optional[datetime] = None,
+    full_filepath: Optional[str] = None,
+    template_searchpath: Optional[Union[str, Iterable[str]]] = None,
+    template_undefined: Type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
+    user_defined_macros: Optional[Dict] = None,
+    user_defined_filters: Optional[Dict] = None,
+    default_args: Optional[Dict] = None,
+    concurrency: Optional[int] = None,
+    max_active_tasks: int = conf.getint('core', 'max_active_tasks_per_dag'),
+    max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'),
+    dagrun_timeout: Optional[timedelta] = None,
+    sla_miss_callback: Optional[SLAMissCallback] = None,
+    default_view: str = conf.get_mandatory_value('webserver', 'dag_default_view').lower(),
+    orientation: str = conf.get_mandatory_value('webserver', 'dag_orientation'),
+    catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'),
+    on_success_callback: Optional[DagStateChangeCallback] = None,
+    on_failure_callback: Optional[DagStateChangeCallback] = None,
+    doc_md: Optional[str] = None,
+    params: Optional[Dict] = None,
+    access_control: Optional[Dict] = None,
+    is_paused_upon_creation: Optional[bool] = None,
+    jinja_environment_kwargs: Optional[Dict] = None,
+    render_template_as_native_obj: bool = False,
+    tags: Optional[List[str]] = None,
+    schedule_on: Optional[Sequence["Dataset"]] = None,
+) -> Callable[[Callable], Callable[..., DAG]]:
     """
     Python dag decorator. Wraps a function into an Airflow DAG.
     Accepts kwargs for operator kwarg. Can be used to parameterize DAGs.
@@ -3069,11 +3104,7 @@ def dag(*dag_args, **dag_kwargs):
     :param dag_kwargs: Kwargs for DAG object.
     """
 
-    def wrapper(f: Callable):
-        # Get dag initializer signature and bind it to validate that dag_args, and dag_kwargs are correct
-        dag_sig = signature(DAG.__init__)
-        dag_bound_args = dag_sig.bind_partial(*dag_args, **dag_kwargs)
-
+    def wrapper(f: Callable) -> Callable[..., DAG]:
         @functools.wraps(f)
         def factory(*args, **kwargs):
             # Generate signature for decorated function and bind the arguments when called
@@ -3083,12 +3114,39 @@ def dag(*dag_args, **dag_kwargs):
             # Apply defaults to capture default values if set.
             f_sig.apply_defaults()
 
-            # Set function name as dag_id if not set
-            dag_id = dag_bound_args.arguments.get('dag_id', f.__name__)
-            dag_bound_args.arguments['dag_id'] = dag_id
-
             # Initialize DAG with bound arguments
-            with DAG(*dag_bound_args.args, **dag_bound_args.kwargs) as dag_obj:
+            with DAG(
+                dag_id or f.__name__,
+                description=description,
+                schedule_interval=schedule_interval,
+                timetable=timetable,
+                start_date=start_date,
+                end_date=end_date,
+                full_filepath=full_filepath,
+                template_searchpath=template_searchpath,
+                template_undefined=template_undefined,
+                user_defined_macros=user_defined_macros,
+                user_defined_filters=user_defined_filters,
+                default_args=default_args,
+                concurrency=concurrency,
+                max_active_tasks=max_active_tasks,
+                max_active_runs=max_active_runs,
+                dagrun_timeout=dagrun_timeout,
+                sla_miss_callback=sla_miss_callback,
+                default_view=default_view,
+                orientation=orientation,
+                catchup=catchup,
+                on_success_callback=on_success_callback,
+                on_failure_callback=on_failure_callback,
+                doc_md=doc_md,
+                params=params,
+                access_control=access_control,
+                is_paused_upon_creation=is_paused_upon_creation,
+                jinja_environment_kwargs=jinja_environment_kwargs,
+                render_template_as_native_obj=render_template_as_native_obj,
+                tags=tags,
+                schedule_on=schedule_on,
+            ) as dag_obj:
                 # Set DAG documentation from function documentation.
                 if f.__doc__:
                     dag_obj.doc_md = f.__doc__
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 9f7be831d9..5dd5d794a0 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -384,6 +384,7 @@ StatefulSets
 StatsD
 StorageClass
 StoredInfoType
+StrictUndefined
 Stringified
 Subclasses
 Subdirectory
@@ -864,8 +865,10 @@ gdbm
 generateUploadUrl
 geq
 getattr
+getboolean
 getfqdn
 getframe
+getint
 getsource
 gevent
 gid
@@ -1323,6 +1326,7 @@ schedulable
 schedulername
 schemas
 sdk
+searchpath
 secretRef
 secretRefs
 securable