You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/12 13:01:08 UTC

[GitHub] [airflow] ashb opened a new pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

ashb opened a new pull request #17577:
URL: https://github.com/apache/airflow/pull/17577


   All but one test in test_scheduler_job.py wants to operate on serialized
   dags, so it makes sense to have this be done in the dag_maker for us, to
   make each test "smaller".
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r690367118



##########
File path: tests/conftest.py
##########
@@ -451,49 +451,88 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
-    from airflow.utils import timezone
-    from airflow.utils.session import provide_session
-    from airflow.utils.state import State
+    import lazy_object_proxy
 
-    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+    # IMPORTANT: Delay _all_ imports from `airflow.*` to _inside a method_.
+    # This fixture is "called" early on in the pytest collection process, and
+    # if we import airflow.* here the wrong (non-test) config will be loaded
+    # and "baked" in to various constants
+
+    want_serialized = False
+
+    # Allow changing default serialized behaviour with `@ptest.mark.need_serialized_dag(True)`

Review comment:
       We don't, good call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r687693111



##########
File path: tests/conftest.py
##########
@@ -451,34 +451,60 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
+    import lazy_object_proxy
+
+    from airflow.models import DAG, DagBag, DagModel, DagRun, TaskInstance
+    from airflow.models.serialized_dag import SerializedDagModel
     from airflow.utils import timezone
     from airflow.utils.session import provide_session
     from airflow.utils.state import State
 
     DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
+    want_serialized = False
+    #
+    # Allow changing default serialized behaviour with `@ptest.mark.need_serialized_dag(True)`
+    serialized_marker = request.node.get_closest_marker("need_serialized_dag")
+    if serialized_marker:
+        want_serialized = serialized_marker.args[0]
+
     class DagFactory:
+        def __init__(self):
+            # Keep all the serialized dags we've created in this test
+            self.dagbag = DagBag(os.devnull, include_examples=False, read_dags_from_db=False)
+
         def __enter__(self):
             self.dag.__enter__()
+            if self.want_serialized:
+                return lazy_object_proxy.Proxy(self._serialized_dag)
             return self.dag
 
+        def _serialized_dag(self):
+            return self.serilized_model.dag

Review comment:
       Wait.. looks like a typo in serialized




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r691158255



##########
File path: tests/conftest.py
##########
@@ -451,49 +451,89 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
-    from airflow.utils import timezone
-    from airflow.utils.session import provide_session
-    from airflow.utils.state import State
+    import lazy_object_proxy
 
-    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+    # IMPORTANT: Delay _all_ imports from `airflow.*` to _inside a method_.
+    # This fixture is "called" early on in the pytest collection process, and
+    # if we import airflow.* here the wrong (non-test) config will be loaded
+    # and "baked" in to various constants
+
+    want_serialized = False
+
+    # Allow changing default serialized behaviour with `@ptest.mark.need_serialized_dag` or
+    # `@ptest.mark.need_serialized_dag(False)`
+    serialized_marker = request.node.get_closest_marker("need_serialized_dag")
+    if serialized_marker:
+        (want_serialized,) = serialized_marker.args or (True,)
 
     class DagFactory:
+        def __init__(self):
+            from airflow.models import DagBag
+
+            # Keep all the serialized dags we've created in this test
+            self.dagbag = DagBag(os.devnull, include_examples=False, read_dags_from_db=False)
+
         def __enter__(self):
             self.dag.__enter__()
+            if self.want_serialized:
+                return lazy_object_proxy.Proxy(self._serialized_dag)
             return self.dag
 
+        def _serialized_dag(self):
+            return self.serialized_model.dag
+
         def __exit__(self, type, value, traceback):
+            from airflow.models import DagModel
+            from airflow.models.serialized_dag import SerializedDagModel
+
             dag = self.dag
             dag.__exit__(type, value, traceback)
-            if type is None:
-                dag.clear()
-                self.dag_model = DagModel(
-                    dag_id=dag.dag_id,
-                    next_dagrun=dag.start_date,
-                    is_active=True,
-                    is_paused=False,
-                    max_active_tasks=dag.max_active_tasks,
-                    has_task_concurrency_limits=False,
-                )
-                self.session.add(self.dag_model)
+            if type is not None:
+                return
+
+            dag.clear()
+            dag.sync_to_db(self.session)
+            self.dag_model = self.session.query(DagModel).get(dag.dag_id)
+
+            if self.want_serialized:
+                self.serialized_model = SerializedDagModel(dag)
+                self.session.merge(self.serialized_model)
+                serialized_dag = self._serialized_dag()
+                self.dagbag.bag_dag(serialized_dag, root_dag=serialized_dag)
                 self.session.flush()

Review comment:
       DagBag doesn't touch the session unless `builk_write_to_db` (or whatever the method is) is called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r690448660



##########
File path: pytest.ini
##########
@@ -32,3 +32,5 @@ faulthandler_timeout = 480
 log_level = INFO
 filterwarnings =
     error::pytest.PytestCollectionWarning
+markers =
+  need_serialized_dag

Review comment:
       ```suggestion
   markers =
       need_serialized_dag
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r690963746



##########
File path: tests/conftest.py
##########
@@ -451,49 +451,89 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
-    from airflow.utils import timezone
-    from airflow.utils.session import provide_session
-    from airflow.utils.state import State
+    import lazy_object_proxy
 
-    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+    # IMPORTANT: Delay _all_ imports from `airflow.*` to _inside a method_.
+    # This fixture is "called" early on in the pytest collection process, and
+    # if we import airflow.* here the wrong (non-test) config will be loaded
+    # and "baked" in to various constants
+
+    want_serialized = False
+
+    # Allow changing default serialized behaviour with `@ptest.mark.need_serialized_dag` or
+    # `@ptest.mark.need_serialized_dag(False)`
+    serialized_marker = request.node.get_closest_marker("need_serialized_dag")
+    if serialized_marker:
+        (want_serialized,) = serialized_marker.args or (True,)
 
     class DagFactory:
+        def __init__(self):
+            from airflow.models import DagBag
+
+            # Keep all the serialized dags we've created in this test
+            self.dagbag = DagBag(os.devnull, include_examples=False, read_dags_from_db=False)
+
         def __enter__(self):
             self.dag.__enter__()
+            if self.want_serialized:
+                return lazy_object_proxy.Proxy(self._serialized_dag)
             return self.dag
 
+        def _serialized_dag(self):
+            return self.serialized_model.dag
+
         def __exit__(self, type, value, traceback):
+            from airflow.models import DagModel
+            from airflow.models.serialized_dag import SerializedDagModel
+
             dag = self.dag
             dag.__exit__(type, value, traceback)
-            if type is None:
-                dag.clear()
-                self.dag_model = DagModel(
-                    dag_id=dag.dag_id,
-                    next_dagrun=dag.start_date,
-                    is_active=True,
-                    is_paused=False,
-                    max_active_tasks=dag.max_active_tasks,
-                    has_task_concurrency_limits=False,
-                )
-                self.session.add(self.dag_model)
+            if type is not None:
+                return
+
+            dag.clear()
+            dag.sync_to_db(self.session)
+            self.dag_model = self.session.query(DagModel).get(dag.dag_id)
+
+            if self.want_serialized:
+                self.serialized_model = SerializedDagModel(dag)
+                self.session.merge(self.serialized_model)
+                serialized_dag = self._serialized_dag()
+                self.dagbag.bag_dag(serialized_dag, root_dag=serialized_dag)
                 self.session.flush()

Review comment:
       Should we flush the session before bagging? (Is there a difference?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#issuecomment-901045884


   > LGTM except one thing I’m not sure about. We should probably add some comments explaining why the imports need to be local.
   > 
   > Feel free to merge without me reviewing again.
   
   https://github.com/apache/airflow/pull/17577/files#diff-e52e4ddd58b7ef887ab03c04116e676f6280b824ab7469d5d3080e5cba4f2128R461-R464


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r690963746



##########
File path: tests/conftest.py
##########
@@ -451,49 +451,89 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
-    from airflow.utils import timezone
-    from airflow.utils.session import provide_session
-    from airflow.utils.state import State
+    import lazy_object_proxy
 
-    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+    # IMPORTANT: Delay _all_ imports from `airflow.*` to _inside a method_.
+    # This fixture is "called" early on in the pytest collection process, and
+    # if we import airflow.* here the wrong (non-test) config will be loaded
+    # and "baked" in to various constants
+
+    want_serialized = False
+
+    # Allow changing default serialized behaviour with `@ptest.mark.need_serialized_dag` or
+    # `@ptest.mark.need_serialized_dag(False)`
+    serialized_marker = request.node.get_closest_marker("need_serialized_dag")
+    if serialized_marker:
+        (want_serialized,) = serialized_marker.args or (True,)
 
     class DagFactory:
+        def __init__(self):
+            from airflow.models import DagBag
+
+            # Keep all the serialized dags we've created in this test
+            self.dagbag = DagBag(os.devnull, include_examples=False, read_dags_from_db=False)
+
         def __enter__(self):
             self.dag.__enter__()
+            if self.want_serialized:
+                return lazy_object_proxy.Proxy(self._serialized_dag)
             return self.dag
 
+        def _serialized_dag(self):
+            return self.serialized_model.dag
+
         def __exit__(self, type, value, traceback):
+            from airflow.models import DagModel
+            from airflow.models.serialized_dag import SerializedDagModel
+
             dag = self.dag
             dag.__exit__(type, value, traceback)
-            if type is None:
-                dag.clear()
-                self.dag_model = DagModel(
-                    dag_id=dag.dag_id,
-                    next_dagrun=dag.start_date,
-                    is_active=True,
-                    is_paused=False,
-                    max_active_tasks=dag.max_active_tasks,
-                    has_task_concurrency_limits=False,
-                )
-                self.session.add(self.dag_model)
+            if type is not None:
+                return
+
+            dag.clear()
+            dag.sync_to_db(self.session)
+            self.dag_model = self.session.query(DagModel).get(dag.dag_id)
+
+            if self.want_serialized:
+                self.serialized_model = SerializedDagModel(dag)
+                self.session.merge(self.serialized_model)
+                serialized_dag = self._serialized_dag()
+                self.dagbag.bag_dag(serialized_dag, root_dag=serialized_dag)
                 self.session.flush()

Review comment:
       Should we flush the session before bagging? (Is there a difference?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#issuecomment-900191247


   mssql failures are `  ERROR: for airflow  Container "73ad017505bd" is unhealthy.`  variety, but _one_ of them passed so I think this is now, finally, good to merge once the rest of the tests pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#issuecomment-897623173


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#issuecomment-898687421


   Merge conflicts !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#issuecomment-901045884


   > LGTM except one thing I’m not sure about. We should probably add some comments explaining why the imports need to be local.
   > 
   > Feel free to merge without me reviewing again.
   
   https://github.com/apache/airflow/pull/17577/files#diff-e52e4ddd58b7ef887ab03c04116e676f6280b824ab7469d5d3080e5cba4f2128R461-R464


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #17577:
URL: https://github.com/apache/airflow/pull/17577


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#issuecomment-900879051


   LGTM except one thing I’m not sure about. We should probably add some comments explaining why the imports need to be local.
   
   Feel free to merge without me reviewing again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #17577:
URL: https://github.com/apache/airflow/pull/17577


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r691158255



##########
File path: tests/conftest.py
##########
@@ -451,49 +451,89 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
-    from airflow.utils import timezone
-    from airflow.utils.session import provide_session
-    from airflow.utils.state import State
+    import lazy_object_proxy
 
-    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+    # IMPORTANT: Delay _all_ imports from `airflow.*` to _inside a method_.
+    # This fixture is "called" early on in the pytest collection process, and
+    # if we import airflow.* here the wrong (non-test) config will be loaded
+    # and "baked" in to various constants
+
+    want_serialized = False
+
+    # Allow changing default serialized behaviour with `@ptest.mark.need_serialized_dag` or
+    # `@ptest.mark.need_serialized_dag(False)`
+    serialized_marker = request.node.get_closest_marker("need_serialized_dag")
+    if serialized_marker:
+        (want_serialized,) = serialized_marker.args or (True,)
 
     class DagFactory:
+        def __init__(self):
+            from airflow.models import DagBag
+
+            # Keep all the serialized dags we've created in this test
+            self.dagbag = DagBag(os.devnull, include_examples=False, read_dags_from_db=False)
+
         def __enter__(self):
             self.dag.__enter__()
+            if self.want_serialized:
+                return lazy_object_proxy.Proxy(self._serialized_dag)
             return self.dag
 
+        def _serialized_dag(self):
+            return self.serialized_model.dag
+
         def __exit__(self, type, value, traceback):
+            from airflow.models import DagModel
+            from airflow.models.serialized_dag import SerializedDagModel
+
             dag = self.dag
             dag.__exit__(type, value, traceback)
-            if type is None:
-                dag.clear()
-                self.dag_model = DagModel(
-                    dag_id=dag.dag_id,
-                    next_dagrun=dag.start_date,
-                    is_active=True,
-                    is_paused=False,
-                    max_active_tasks=dag.max_active_tasks,
-                    has_task_concurrency_limits=False,
-                )
-                self.session.add(self.dag_model)
+            if type is not None:
+                return
+
+            dag.clear()
+            dag.sync_to_db(self.session)
+            self.dag_model = self.session.query(DagModel).get(dag.dag_id)
+
+            if self.want_serialized:
+                self.serialized_model = SerializedDagModel(dag)
+                self.session.merge(self.serialized_model)
+                serialized_dag = self._serialized_dag()
+                self.dagbag.bag_dag(serialized_dag, root_dag=serialized_dag)
                 self.session.flush()

Review comment:
       DagBag doesn't touch the session unless `builk_write_to_db` (or whatever the method is) is called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#discussion_r690283457



##########
File path: tests/conftest.py
##########
@@ -451,49 +451,88 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
-    from airflow.utils import timezone
-    from airflow.utils.session import provide_session
-    from airflow.utils.state import State
+    import lazy_object_proxy
 
-    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+    # IMPORTANT: Delay _all_ imports from `airflow.*` to _inside a method_.
+    # This fixture is "called" early on in the pytest collection process, and
+    # if we import airflow.* here the wrong (non-test) config will be loaded
+    # and "baked" in to various constants
+
+    want_serialized = False
+
+    # Allow changing default serialized behaviour with `@ptest.mark.need_serialized_dag(True)`

Review comment:
       ```suggestion
       # Allow changing default serialized behaviour with `@pytest.mark.need_serialized_dag(True)`
   ```
   
   Why do we need the `True` though? We don’t ever need to do `@pytest.mark.need_serialized_dag(False)` (and this does not work in the current implementation anyway).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17577: Have the dag_maker fixture (optionally) give SerializedDAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17577:
URL: https://github.com/apache/airflow/pull/17577#issuecomment-900879051


   LGTM except one thing I’m not sure about. We should probably add some comments explaining why the imports need to be local.
   
   Feel free to merge without me reviewing again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org