You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/16 18:21:28 UTC

[GitHub] [airflow] casassg commented on a change in pull request #10587: Add @dag decorator

casassg commented on a change in pull request #10587:
URL: https://github.com/apache/airflow/pull/10587#discussion_r506649427



##########
File path: airflow/models/dag.py
##########
@@ -2187,6 +2199,33 @@ def calculate_dagrun_date_fields(
 
         log.info("Setting next_dagrun for %s to %s", dag.dag_id, self.next_dagrun)
 
+def dag(*dag_args, **dag_kwargs):
+    """
+    Python dag decorator. Wraps a function into an Airflow DAG.
+    Accepts kwargs for operator kwarg. Can be used to parametrize DAGs.
+
+    :param dag_args: Arguments for DAG object
+    :type dag_args: list
+    :param dag_kwargs: Kwargs for DAG object.
+    :type dag_kwargs: dict
+    """
+    def wrapper(f: Callable):
+        dag_sig = signature(DAG.__init__)
+        dag_sig = dag_sig.bind_partial(*dag_args, **dag_kwargs)
+
+        @functools.wraps(f)
+        def factory(*args, **kwargs):
+            f_sig = signature(f).bind(*args, **kwargs)
+            f_sig.apply_defaults()
+            with DAG(*dag_sig.args, dag_id=f.__name__, **dag_sig.kwargs) as dag_obj:
+                f_kwargs = {}
+                for name, value in f_sig.arguments.items():
+                    f_kwargs[name] = dag_obj.param(name, value)
+                f(**f_kwargs)
+            return dag_obj

Review comment:
       Lmk if this is enough. Adding more unit tests as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org