You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/29 20:51:35 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #17324: Improve `dag_maker` fixture

ephraimbuddy opened a new pull request #17324:
URL: https://github.com/apache/airflow/pull/17324


   This PR improves the dag_maker fixture to enable creation of dagrun, dag and dag_model separately
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17324:
URL: https://github.com/apache/airflow/pull/17324#issuecomment-890755220


   Seems to work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17324:
URL: https://github.com/apache/airflow/pull/17324#issuecomment-890763646


   > Seems to work!
   
   Yeah. Thanks for your helps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17324:
URL: https://github.com/apache/airflow/pull/17324#discussion_r679708043



##########
File path: tests/conftest.py
##########
@@ -444,33 +445,44 @@ def __exit__(self, type, value, traceback):
             dag.__exit__(type, value, traceback)
             if type is None:
                 dag.clear()
-                self.dag_run = dag.create_dagrun(
-                    run_id=self.kwargs.get("run_id", "test"),
-                    state=self.kwargs.get('state', State.RUNNING),
-                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
-                    start_date=self.kwargs['start_date'],
-                )
+
+        def _set_default_args(self, kwargs, defaults):
+            for k, v in defaults.items():
+                if k not in kwargs:
+                    kwargs.setdefault(k, v)
+
+        @provide_session
+        def make_dagmodel(self, session=None, **kwargs):
+            dag = self.dag
+            defaults = dict(dag_id=dag.dag_id, next_dagrun=dag.start_date, is_active=True)
+            self._set_default_args(kwargs, defaults)

Review comment:
       This is so apt! Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #17324:
URL: https://github.com/apache/airflow/pull/17324


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17324:
URL: https://github.com/apache/airflow/pull/17324#discussion_r681605562



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -715,6 +699,8 @@ def test_fast_follow(
                 for upstream, downstream in dependencies.items():
                     dag.set_dependency(upstream, downstream)
 
+            dag_maker.make_dagmodel()

Review comment:
       This shouldn't have to be called -- I think we should _always_ create the DagModel, as my high-level goal with asking for this fixture is to make tests more representative of _How_ Airflow is when running, specifically that the SerializedDag and DagModel rows are always present for a DAG.
   
   And in the _rare_ cases where we don't want either of these we could then delete them in the tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #17324:
URL: https://github.com/apache/airflow/pull/17324


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17324:
URL: https://github.com/apache/airflow/pull/17324#discussion_r681611854



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -715,6 +699,8 @@ def test_fast_follow(
                 for upstream, downstream in dependencies.items():
                     dag.set_dependency(upstream, downstream)
 
+            dag_maker.make_dagmodel()

Review comment:
       The problem was on separating the arguments that DagModel takes from the ones that DAG takes. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17324:
URL: https://github.com/apache/airflow/pull/17324#discussion_r679648318



##########
File path: tests/conftest.py
##########
@@ -444,33 +445,44 @@ def __exit__(self, type, value, traceback):
             dag.__exit__(type, value, traceback)
             if type is None:
                 dag.clear()
-                self.dag_run = dag.create_dagrun(
-                    run_id=self.kwargs.get("run_id", "test"),
-                    state=self.kwargs.get('state', State.RUNNING),
-                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
-                    start_date=self.kwargs['start_date'],
-                )
+
+        def _set_default_args(self, kwargs, defaults):
+            for k, v in defaults.items():
+                if k not in kwargs:
+                    kwargs.setdefault(k, v)
+
+        @provide_session
+        def make_dagmodel(self, session=None, **kwargs):
+            dag = self.dag
+            defaults = dict(dag_id=dag.dag_id, next_dagrun=dag.start_date, is_active=True)
+            self._set_default_args(kwargs, defaults)
+            dag_model = DagModel(**kwargs)
+            session.add(dag_model)
+            session.flush()
+            return dag_model
+
+        def create_dagrun(self, **kwargs):
+            dag = self.dag
+            defaults = dict(
+                run_id='test',
+                state=State.RUNNING,
+                execution_date=self.start_date,
+                start_date=self.start_date,
+            )
+            self._set_default_args(kwargs, defaults)
+            self.dag_run = dag.create_dagrun(**kwargs)
+            return self.dag_run
 
         def __call__(self, dag_id='test_dag', **kwargs):
             self.kwargs = kwargs
-            if "start_date" not in kwargs:
+            self.start_date = self.kwargs.get('start_date', None)
+            if not self.start_date:
                 if hasattr(request.module, 'DEFAULT_DATE'):
-                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                    self.start_date = getattr(request.module, 'DEFAULT_DATE')
                 else:
-                    kwargs['start_date'] = DEFAULT_DATE
-            dagrun_fields_not_in_dag = [
-                'state',
-                'execution_date',
-                'run_type',
-                'queued_at',
-                "run_id",
-                "creating_job_id",
-                "external_trigger",
-                "last_scheduling_decision",
-                "dag_hash",
-            ]
-            kwargs = {k: v for k, v in kwargs.items() if k not in dagrun_fields_not_in_dag}
-            self.dag = DAG(dag_id, **kwargs)
+                    self.start_date = DEFAULT_DATE
+            self.kwargs.update(dict(start_date=self.start_date))

Review comment:
       ```suggestion
               self.kwargs['start_date'] = self.start_date
   ```
   
   But if `start_date` is unconditionally overridden, maybe we should instead…

##########
File path: tests/conftest.py
##########
@@ -444,33 +445,44 @@ def __exit__(self, type, value, traceback):
             dag.__exit__(type, value, traceback)
             if type is None:
                 dag.clear()
-                self.dag_run = dag.create_dagrun(
-                    run_id=self.kwargs.get("run_id", "test"),
-                    state=self.kwargs.get('state', State.RUNNING),
-                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
-                    start_date=self.kwargs['start_date'],
-                )
+
+        def _set_default_args(self, kwargs, defaults):
+            for k, v in defaults.items():
+                if k not in kwargs:
+                    kwargs.setdefault(k, v)
+
+        @provide_session
+        def make_dagmodel(self, session=None, **kwargs):
+            dag = self.dag
+            defaults = dict(dag_id=dag.dag_id, next_dagrun=dag.start_date, is_active=True)
+            self._set_default_args(kwargs, defaults)

Review comment:
       I think this can be instead
   
   ```suggestion
               kwargs = {**defaults, **kwargs}
   ```
   
   [PEP 448](https://www.python.org/dev/peps/pep-0448/#specification):
   
   > The keys in a dictionary remain in a right-to-left priority order, so `{**{'a': 1}, 'a': 2, **{'a': 3}}` evaluates to `{'a': 3}`. There is no restriction on the number or position of unpackings.

##########
File path: tests/conftest.py
##########
@@ -444,33 +445,44 @@ def __exit__(self, type, value, traceback):
             dag.__exit__(type, value, traceback)
             if type is None:
                 dag.clear()
-                self.dag_run = dag.create_dagrun(
-                    run_id=self.kwargs.get("run_id", "test"),
-                    state=self.kwargs.get('state', State.RUNNING),
-                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
-                    start_date=self.kwargs['start_date'],
-                )
+
+        def _set_default_args(self, kwargs, defaults):
+            for k, v in defaults.items():
+                if k not in kwargs:
+                    kwargs.setdefault(k, v)
+
+        @provide_session
+        def make_dagmodel(self, session=None, **kwargs):
+            dag = self.dag
+            defaults = dict(dag_id=dag.dag_id, next_dagrun=dag.start_date, is_active=True)
+            self._set_default_args(kwargs, defaults)
+            dag_model = DagModel(**kwargs)
+            session.add(dag_model)
+            session.flush()
+            return dag_model
+
+        def create_dagrun(self, **kwargs):
+            dag = self.dag
+            defaults = dict(
+                run_id='test',
+                state=State.RUNNING,
+                execution_date=self.start_date,
+                start_date=self.start_date,
+            )
+            self._set_default_args(kwargs, defaults)
+            self.dag_run = dag.create_dagrun(**kwargs)
+            return self.dag_run
 
         def __call__(self, dag_id='test_dag', **kwargs):
             self.kwargs = kwargs
-            if "start_date" not in kwargs:
+            self.start_date = self.kwargs.get('start_date', None)
+            if not self.start_date:
                 if hasattr(request.module, 'DEFAULT_DATE'):
-                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                    self.start_date = getattr(request.module, 'DEFAULT_DATE')
                 else:
-                    kwargs['start_date'] = DEFAULT_DATE
-            dagrun_fields_not_in_dag = [
-                'state',
-                'execution_date',
-                'run_type',
-                'queued_at',
-                "run_id",
-                "creating_job_id",
-                "external_trigger",
-                "last_scheduling_decision",
-                "dag_hash",
-            ]
-            kwargs = {k: v for k, v in kwargs.items() if k not in dagrun_fields_not_in_dag}
-            self.dag = DAG(dag_id, **kwargs)
+                    self.start_date = DEFAULT_DATE
+            self.kwargs.update(dict(start_date=self.start_date))
+            self.dag = DAG(dag_id, **self.kwargs)

Review comment:
       ```suggestion
               self.dag = DAG(dag_id, start_date=self.start_date, **self.kwargs)
   ```
   This makes the error clearer if the caller accidentally passes `start_date` into `dag_maker`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17324: Improve `dag_maker` fixture

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17324:
URL: https://github.com/apache/airflow/pull/17324#issuecomment-889724423


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org