You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/19 07:45:33 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672062375



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    class DagFactory:
+        def __enter__(self):
+            self.dag.__enter__()
+            return self.dag
+
+        def __exit__(self, type, value, traceback):
+            dag = self.dag
+            dag.__exit__(type, value, traceback)
+            if type is None:
+                dag.sync_to_db()
+                dag.clear()
+                self.dag_run = dag.create_dagrun(
+                    run_id="test",
+                    state=self.kwargs.get('state', State.RUNNING),
+                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
+                    start_date=self.kwargs['start_date'],
+                )
+
+        def __call__(self, dag_id='test_dag', **kwargs):
+            self.kwargs = kwargs
+            if "start_date" not in kwargs:
+                if hasattr(request.module, 'DEFAULT_DATE'):
+                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                else:
+                    kwargs['start_date'] = DEFAULT_DATE
+            kwargs = {k: v for k, v in kwargs.items() if k not in ['state', 'execution_date', 'run_type']}
+            self.dag = DAG(dag_id, **kwargs)
+            return self
+
+    return DagFactory()

Review comment:
       Setting things on a context manager instance in `__enter__` (but not cleaned up in `__exit__`) is usually not a good idea since the side effect can leak out of the manager. But this is only used in tests, so as long as we use it correctly it’s not a big deal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org