You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/20 16:16:43 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

ephraimbuddy opened a new pull request #16889:
URL: https://github.com/apache/airflow/pull/16889


   This change adds pytest fixture to use in creating dag and dagruns
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666348163



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE
+        if with_dagbag:
+            dagbag = DagBag(
+                dag_folder=TEST_DAG_FOLDER,
+                include_examples=False,
+            )
+            dag = dagbag.get_dag(dag_id)
+            task = dag.get_task(task_id)
+        else:
+            dag = DAG(dag_id, start_date=kwargs['start_date'])
+            if python_callable:
+                task = PythonOperator(task_id=task_id, python_callable=python_callable, dag=dag, **py_kwargs)
+            else:
+                task = DummyOperator(task_id=task_id, run_as_user=kwargs.get("run_as_user", None), dag=dag)

Review comment:
       Perhaps this structure:
   
   ```python
   @pytest.fixture
   def dag_maker():
      class DagMaker():
          def __call__(self):
              # The proposed behaviour.
              with dag:
                 yield dag
               self.dag_run = dag.create_run(...)
   
       return DagMaker()
   ```
   
   
   We could then use it like this in tests
   
   ```python
       with dag_maker():
           task = PythonOperator(task_id=task_id, python_callable=python_callable)
       dag_run = dag_maker.dag_run
   ```
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666345945



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE
+        if with_dagbag:
+            dagbag = DagBag(
+                dag_folder=TEST_DAG_FOLDER,
+                include_examples=False,
+            )
+            dag = dagbag.get_dag(dag_id)
+            task = dag.get_task(task_id)
+        else:
+            dag = DAG(dag_id, start_date=kwargs['start_date'])
+            if python_callable:
+                task = PythonOperator(task_id=task_id, python_callable=python_callable, dag=dag, **py_kwargs)
+            else:
+                task = DummyOperator(task_id=task_id, run_as_user=kwargs.get("run_as_user", None), dag=dag)

Review comment:
       Hmmm, though how to get the DAG run out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666319721



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE

Review comment:
       ```suggestion
           if "start_date" not in kwargs:
               if hasattr(request.module, 'DEFAULT_DATE'):
                   kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
               else:
                   kwargs['start_date'] = DEFAULT_DATE
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671593835



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    class DagFactory:
+        def __enter__(self):
+            self.dag.__enter__()
+            return self.dag
+
+        def __exit__(self, type, value, traceback):
+            dag = self.dag
+            dag.__exit__(type, value, traceback)
+            if type is None:
+                dag.sync_to_db()
+                dag.clear()
+                self.dag_run = dag.create_dagrun(
+                    run_id="test",
+                    state=self.kwargs.get('state', State.RUNNING),
+                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
+                    start_date=self.kwargs['start_date'],
+                )
+
+        def __call__(self, dag_id='test_dag', **kwargs):
+            self.kwargs = kwargs
+            if "start_date" not in kwargs:
+                if hasattr(request.module, 'DEFAULT_DATE'):
+                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                else:
+                    kwargs['start_date'] = DEFAULT_DATE
+            kwargs = {k: v for k, v in kwargs.items() if k not in ['state', 'execution_date', 'run_type']}
+            self.dag = DAG(dag_id, **kwargs)
+            return self
+
+    return DagFactory()

Review comment:
       Can this not be defined like this instead?
   
   ```python
   class DagFactory:
       def __init__(self, dag_id="test_dag", **kwargs):
          # what's currently in __call__
   
       # __enter__ and __exit__ are unchanged
   
   return DagFactory
   ```
   
   This is easier to reason with IMO.
   
   Or even simpler:
   
   ```python
   @contextlib.contextmanager
   def factory(dag_id="test_dag", **kwargs):
       # What's currently in __call__ but drop the "self" part
       dag.__enter__()
       yield dag
       # What's currently in __exit__
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671593835



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    class DagFactory:
+        def __enter__(self):
+            self.dag.__enter__()
+            return self.dag
+
+        def __exit__(self, type, value, traceback):
+            dag = self.dag
+            dag.__exit__(type, value, traceback)
+            if type is None:
+                dag.sync_to_db()
+                dag.clear()
+                self.dag_run = dag.create_dagrun(
+                    run_id="test",
+                    state=self.kwargs.get('state', State.RUNNING),
+                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
+                    start_date=self.kwargs['start_date'],
+                )
+
+        def __call__(self, dag_id='test_dag', **kwargs):
+            self.kwargs = kwargs
+            if "start_date" not in kwargs:
+                if hasattr(request.module, 'DEFAULT_DATE'):
+                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                else:
+                    kwargs['start_date'] = DEFAULT_DATE
+            kwargs = {k: v for k, v in kwargs.items() if k not in ['state', 'execution_date', 'run_type']}
+            self.dag = DAG(dag_id, **kwargs)
+            return self
+
+    return DagFactory()

Review comment:
       Can this not be defined like this instead?
   
   ```python
   class DagFactory:
       def __init__(self, dag_id="test_dag", **kwargs):
          # what's currently in __call__
   
       # __enter__ and __exit__ are unchanged
   
   return DagFactory
   ```
   
   This is easier to reason with IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671641165



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule

Review comment:
       Yea. I will be moving this out...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672924194



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Just tried, it seems that `usefixtures()` on a class is the same as marking `usefixtures()` on every test in the class. So we are missing a class-level teardown after all tests on the tests are run.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #16889:
URL: https://github.com/apache/airflow/pull/16889


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666348163



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE
+        if with_dagbag:
+            dagbag = DagBag(
+                dag_folder=TEST_DAG_FOLDER,
+                include_examples=False,
+            )
+            dag = dagbag.get_dag(dag_id)
+            task = dag.get_task(task_id)
+        else:
+            dag = DAG(dag_id, start_date=kwargs['start_date'])
+            if python_callable:
+                task = PythonOperator(task_id=task_id, python_callable=python_callable, dag=dag, **py_kwargs)
+            else:
+                task = DummyOperator(task_id=task_id, run_as_user=kwargs.get("run_as_user", None), dag=dag)

Review comment:
       Perhaps this structure:
   
   ```python
   @pytest.fixture
   def dag_maker(request):
   
       DEFAULT_DATE = timezone.datetime(2016, 1, 1)
   
       class DagFactory():
           def __enter__(self):
               self.dag.__enter__()
               return self.dag
   
           def __exit__(self, type, value, traceback):
               dag = self.dag
               dag.__exit__(type, value, traceback)
               if type is None:
                   dag.sync_to_db()
                   dag.clear()
                   self.dag_run = dag.create_dagrun(
                       run_id="test",
                       state=self.kwargs.get('state', State.SUCCESS),
                       execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
                       start_date=self.kwargs['start_date'],
                   )
   
           def __call__(self, dag_id='test_dag', **kwargs):
               self.kwargs = kwargs
               if "start_date" not in kwargs:
                   if hasattr(request.module, 'DEFAULT_DATE'):
                       kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
                   else:
                       kwargs['start_date'] = DEFAULT_DATE
               # TODO: filter out of kwargs things we use to control the dag-run we create.
               self.dag = DAG(dag_id, **kwargs)
               return self
   
   
       return DagFactory()
   
   ```
   
   
   We could then use it like this in tests
   
   ```python
       with dag_maker():
           task = PythonOperator(task_id=task_id, python_callable=python_callable)
       dag_run = dag_maker.dag_run
   ```
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672864233



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Hi @uranusjr, can you take a look again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#issuecomment-883356738


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672104055



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Cool. I think it is fixed now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672104055



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Cool. I think it is fixed now?

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       I do not fully understand the pytest fixtures but from what I have read so far, if you don't specify the scope, then the default is function scope which means it's created and cleared at every function run? considering that we are using it on the class. 
   
   I think that if it's not doing 1 & 2 right now that many of the tests on the class would have failed or what do you think?

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Hi @uranusjr, can you take a look again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r673009693



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Thanks @uranusjr, after much search on this, It seems the fixture is destroyed at the end of each test in the class leaving nothing else to destroy. I experimented by running the tests in /jobs/ and there was no side effect. Another option is using autouse=True and not adding on the class, then it would be used on every test in the module and destroyed at the end of each one.
   I think we are on the right path since it runs and destroys on every test leaving nothing behind. I may be missing something though.
   https://docs.pytest.org/en/6.2.x/fixture.html#fixture-scopes 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671841722



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    class DagFactory:
+        def __enter__(self):
+            self.dag.__enter__()
+            return self.dag
+
+        def __exit__(self, type, value, traceback):
+            dag = self.dag
+            dag.__exit__(type, value, traceback)
+            if type is None:
+                dag.sync_to_db()
+                dag.clear()
+                self.dag_run = dag.create_dagrun(
+                    run_id="test",
+                    state=self.kwargs.get('state', State.RUNNING),
+                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
+                    start_date=self.kwargs['start_date'],
+                )
+
+        def __call__(self, dag_id='test_dag', **kwargs):
+            self.kwargs = kwargs
+            if "start_date" not in kwargs:
+                if hasattr(request.module, 'DEFAULT_DATE'):
+                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                else:
+                    kwargs['start_date'] = DEFAULT_DATE
+            kwargs = {k: v for k, v in kwargs.items() if k not in ['state', 'execution_date', 'run_type']}
+            self.dag = DAG(dag_id, **kwargs)
+            return self
+
+    return DagFactory()

Review comment:
       It turns out that the current code is good as it is in terms of being fast, however, I was not able to get it to work the way you suggested. 
   Since we are using it as a context manager so that we can define tasks under dag and create dag_runs on exit, it makes sense to me. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672920031



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Yes but you only use the `clear_db` on the class. I’m not sure tbh what that actually does (pytest documentation is quite sparse on this), but I assume it either applies the fixture to be run for every test in the class, or once for that class. Which means you wouldn’t be running the fixture for all of the cases listed above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666322338



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE
+        if with_dagbag:
+            dagbag = DagBag(
+                dag_folder=TEST_DAG_FOLDER,
+                include_examples=False,
+            )
+            dag = dagbag.get_dag(dag_id)
+            task = dag.get_task(task_id)
+        else:
+            dag = DAG(dag_id, start_date=kwargs['start_date'])
+            if python_callable:
+                task = PythonOperator(task_id=task_id, python_callable=python_callable, dag=dag, **py_kwargs)
+            else:
+                task = DummyOperator(task_id=task_id, run_as_user=kwargs.get("run_as_user", None), dag=dag)
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_id="test",
+            state=kwargs.get('state', State.SUCCESS),
+            execution_date=kwargs.get('execution_date', kwargs['start_date']),
+            start_date=kwargs['start_date'],
+        )
+        yield dag
+        yield task
+        yield dr

Review comment:
       This probably doesn't do what you think.
   
   To get these values you'd have to do something like
   
   ```python
   for i, val in enumerate(dag_maker()):
       if i == 0:
          print("DAG")
       elif i == 1:
           print("Task")
       elif i == 2:
           print("DagRun")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672215533



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       I do not fully understand the pytest fixtures but from what I have read so far, if you don't specify the scope, then the default is function scope which means it's created and cleared at every function run? considering that we are using it on the class. 
   
   I think that if it's not doing 1 & 2 right now that many of the tests on the class would have failed or what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672114155



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       So there are three points we need to clean db
   
   1. Before any tests in the class run
   2. Between each test in the class
   3. After all tests in the class run
   
   Previously you’re missing 3, but now you’re missing 1 and 2 (I think). We need them all.

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Yes but you only use the `clear_db` on the class. I’m not sure tbh what that actually does (pytest documentation is quite sparse on this), but I assume it either applies the fixture to be run for every test in the class, or once for that class. Which means you wouldn’t be running the fixture for all of the cases listed above.

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Just tried, it seems that `usefixtures()` on a class is the same as marking `usefixtures()` on every test in the class. So we are missing a class-level teardown after all tests on the tests are run.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672062375



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    class DagFactory:
+        def __enter__(self):
+            self.dag.__enter__()
+            return self.dag
+
+        def __exit__(self, type, value, traceback):
+            dag = self.dag
+            dag.__exit__(type, value, traceback)
+            if type is None:
+                dag.sync_to_db()
+                dag.clear()
+                self.dag_run = dag.create_dagrun(
+                    run_id="test",
+                    state=self.kwargs.get('state', State.RUNNING),
+                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
+                    start_date=self.kwargs['start_date'],
+                )
+
+        def __call__(self, dag_id='test_dag', **kwargs):
+            self.kwargs = kwargs
+            if "start_date" not in kwargs:
+                if hasattr(request.module, 'DEFAULT_DATE'):
+                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                else:
+                    kwargs['start_date'] = DEFAULT_DATE
+            kwargs = {k: v for k, v in kwargs.items() if k not in ['state', 'execution_date', 'run_type']}
+            self.dag = DAG(dag_id, **kwargs)
+            return self
+
+    return DagFactory()

Review comment:
       Setting things on a context manager instance in `__enter__` (but not cleaned up in `__exit__`) is usually not a good idea since the side effect can leak out of the manager. But this is only used in tests, so as long as we use it correctly it’s not a big deal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666320758



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE
+        if with_dagbag:
+            dagbag = DagBag(
+                dag_folder=TEST_DAG_FOLDER,
+                include_examples=False,
+            )
+            dag = dagbag.get_dag(dag_id)
+            task = dag.get_task(task_id)

Review comment:
       Not sure we need this for pulling out of a dag bag -- that is already fairly easy -- and _creating_ a new DagBag each time would be slow.
   
   So I think remove this, and have it only for cases where we do `dag = DAG(...)` in tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r673040289



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       The fixture is defined like this
   
   ```python
   @pytest.fixture
   def clear_db():
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
       yield
   ```
   
   The db clear calls are done in the fixture’s *setup* phase (before yield), so after the last test runs, the fixture itself is destroyed, but that destroy phase does not clear db. So I think we need one more fixture:
   
   ```
   @pytest.fixture(scope="class")
   def clear_db_class():
       yield
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
   ```
   
   to handle that?

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       The fixture is defined like this
   
   ```python
   @pytest.fixture
   def clear_db():
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
       yield
   ```
   
   The db clear calls are done in the fixture’s *setup* phase (before yield), so after the last test runs, the fixture itself is destroyed, but that destroy phase does not clear db. So I think we need one more fixture:
   
   ```python
   @pytest.fixture(scope="class")
   def clear_db_class():
       yield
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
   ```
   
   to handle that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672215533



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       I do not fully understand the pytest fixtures but from what I have read so far, if you don't specify the scope, then the default is function scope which means it's created and cleared at every function run? considering that we are using it on the class. 
   
   I think that if it's not doing 1 & 2 right now that many of the tests on the class would have failed or what do you think?

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Hi @uranusjr, can you take a look again

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Thanks @uranusjr, after much search on this, It seems the fixture is destroyed at the end of each test in the class leaving nothing else to destroy. I experimented by running the tests in /jobs/ and there was no side effect. Another option is using autouse=True and not adding on the class, then it would be used on every test in the module and destroyed at the end of each one.
   I think we are on the right path since it runs and destroys on every test leaving nothing behind. I may be missing something though.
   https://docs.pytest.org/en/6.2.x/fixture.html#fixture-scopes 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671641516



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    class DagFactory:
+        def __enter__(self):
+            self.dag.__enter__()
+            return self.dag
+
+        def __exit__(self, type, value, traceback):
+            dag = self.dag
+            dag.__exit__(type, value, traceback)
+            if type is None:
+                dag.sync_to_db()
+                dag.clear()
+                self.dag_run = dag.create_dagrun(
+                    run_id="test",
+                    state=self.kwargs.get('state', State.RUNNING),
+                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
+                    start_date=self.kwargs['start_date'],
+                )
+
+        def __call__(self, dag_id='test_dag', **kwargs):
+            self.kwargs = kwargs
+            if "start_date" not in kwargs:
+                if hasattr(request.module, 'DEFAULT_DATE'):
+                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                else:
+                    kwargs['start_date'] = DEFAULT_DATE
+            kwargs = {k: v for k, v in kwargs.items() if k not in ['state', 'execution_date', 'run_type']}
+            self.dag = DAG(dag_id, **kwargs)
+            return self
+
+    return DagFactory()

Review comment:
       I'm reviewing it too because the tests takes longer to run. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671593835



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    class DagFactory:
+        def __enter__(self):
+            self.dag.__enter__()
+            return self.dag
+
+        def __exit__(self, type, value, traceback):
+            dag = self.dag
+            dag.__exit__(type, value, traceback)
+            if type is None:
+                dag.sync_to_db()
+                dag.clear()
+                self.dag_run = dag.create_dagrun(
+                    run_id="test",
+                    state=self.kwargs.get('state', State.RUNNING),
+                    execution_date=self.kwargs.get('execution_date', self.kwargs['start_date']),
+                    start_date=self.kwargs['start_date'],
+                )
+
+        def __call__(self, dag_id='test_dag', **kwargs):
+            self.kwargs = kwargs
+            if "start_date" not in kwargs:
+                if hasattr(request.module, 'DEFAULT_DATE'):
+                    kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+                else:
+                    kwargs['start_date'] = DEFAULT_DATE
+            kwargs = {k: v for k, v in kwargs.items() if k not in ['state', 'execution_date', 'run_type']}
+            self.dag = DAG(dag_id, **kwargs)
+            return self
+
+    return DagFactory()

Review comment:
       Can this not be defined like this instead?
   
   ```python
   class DagFactory:
       def __init__(self, dag_id="test_dag", **kwargs):
          # what's currently in __call__
   
       # __enter__ and __exit__ are unchanged
   
   return DagFactory
   ```
   
   This is easier to reason with IMO.
   
   Or even simpler:
   
   ```python
   @contextlib.contextmanager
   def factory(dag_id="test_dag", **kwargs):
       # What's currently in __call__ but drop the "self" part
       with dag:
           yield dag
       # What's currently in __exit__
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r673052917



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Ok. I will add that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671641165



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule

Review comment:
       Yea. I will be moving this out from here too...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r673077769



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,51 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
+@pytest.fixture
+def clear_db():
+    db.clear_db_dags()
+    db.clear_db_jobs()
+    db.clear_db_runs()
+    db.clear_db_task_fail()
+    yield
+
+
+@pytest.fixture(scope='class')
+def clear_db_class():
+    yield
+    db.clear_db_dags()
+    db.clear_db_jobs()
+    db.clear_db_runs()
+    db.clear_db_task_fail()
+
 
-    def tearDown(self) -> None:
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
+@pytest.mark.usefixtures('clear_db_class')
+@pytest.mark.usefixtures('clear_db')

Review comment:
       ```suggestion
   @pytest.mark.usefixtures('clear_db_class', 'clear_db')
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666345243



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE
+        if with_dagbag:
+            dagbag = DagBag(
+                dag_folder=TEST_DAG_FOLDER,
+                include_examples=False,
+            )
+            dag = dagbag.get_dag(dag_id)
+            task = dag.get_task(task_id)
+        else:
+            dag = DAG(dag_id, start_date=kwargs['start_date'])
+            if python_callable:
+                task = PythonOperator(task_id=task_id, python_callable=python_callable, dag=dag, **py_kwargs)
+            else:
+                task = DummyOperator(task_id=task_id, run_as_user=kwargs.get("run_as_user", None), dag=dag)

Review comment:
       I think this might be too much magic, and if we do this:
   
   ```suggestion
               dag = DAG(dag_id, **kwargs)
               with dag:
                   yield dag
   ```
   
   Then we can use it like this
   
   ```python
       with dag_maker():
           task = PythonOperator(task_id=task_id, python_callable=python_callable)
   ```
   
   Which I think gives a lot of freedom, whilst still removing a lot of boilerplate from tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672114155



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       So there are three points we need to clean db
   
   1. Before any tests in the class run
   2. Between each test in the class
   3. After all tests in the class run
   
   Previously you’re missing 3, but now you’re missing 1 and 2 (I think). We need them all.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16889:
URL: https://github.com/apache/airflow/pull/16889


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672920031



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Yes but you only use the `clear_db` on the class. I’m not sure tbh what that actually does (pytest documentation is quite sparse on this), but I assume it either applies the fixture to be run for every test in the class, or once for that class. Which means you wouldn’t be running the fixture for all of the cases listed above.

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       Just tried, it seems that `usefixtures()` on a class is the same as marking `usefixtures()` on every test in the class. So we are missing a class-level teardown after all tests on the tests are run.

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       The fixture is defined like this
   
   ```python
   @pytest.fixture
   def clear_db():
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
       yield
   ```
   
   The db clear calls are done in the fixture’s *setup* phase (before yield), so after the last test runs, the fixture itself is destroyed, but that destroy phase does not clear db. So I think we need one more fixture:
   
   ```
   @pytest.fixture(scope="class")
   def clear_db_class():
       yield
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
   ```
   
   to handle that?

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       The fixture is defined like this
   
   ```python
   @pytest.fixture
   def clear_db():
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
       yield
   ```
   
   The db clear calls are done in the fixture’s *setup* phase (before yield), so after the last test runs, the fixture itself is destroyed, but that destroy phase does not clear db. So I think we need one more fixture:
   
   ```python
   @pytest.fixture(scope="class")
   def clear_db_class():
       yield
       db.clear_db_dags()
       db.clear_db_jobs()
       db.clear_db_runs()
       db.clear_db_task_fail()
   ```
   
   to handle that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r671593548



##########
File path: tests/conftest.py
##########
@@ -424,3 +429,54 @@ def app():
     from airflow.www import app
 
     return app.create_app(testing=True)
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_runs()
+        db.clear_db_dags()
+        db.clear_db_serialized_dags()
+        db.clear_db_jobs()
+        db.clear_db_task_fail()
+        db.clear_db_task_reschedule

Review comment:
       ```suggestion
           db.clear_db_task_reschedule()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16889: Add Pytest fixture to create dag and dagrun and use it on local task job tests

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r672065423



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -55,45 +54,35 @@
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
 
 
-class TestLocalTaskJob(unittest.TestCase):
-    def setUp(self):
-        db.clear_db_dags()
-        db.clear_db_jobs()
-        db.clear_db_runs()
-        db.clear_db_task_fail()
-        patcher = patch('airflow.jobs.base_job.sleep')
-        self.addCleanup(patcher.stop)
-        self.mock_base_job_sleep = patcher.start()
-
-    def tearDown(self) -> None:
+class TestLocalTaskJob:
+    @pytest.fixture(autouse=True)
+    def clean_db_and_set_instance_attrs(self):
         db.clear_db_dags()
         db.clear_db_jobs()
         db.clear_db_runs()
         db.clear_db_task_fail()

Review comment:
       I want a class-level fixture to clear database *after* all the tests run; this prevents side effect from leaking into other tests, which has been problematic in a lot of tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16889: Add fixture to create dag, task and dagrun in tests and modify /job tests

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16889:
URL: https://github.com/apache/airflow/pull/16889#discussion_r666540701



##########
File path: tests/jobs/conftest.py
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.models.dagbag import DagBag
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from tests.test_utils import db
+
+TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
+
+
+@pytest.fixture()
+def clear_db():
+    def _clean_up():
+        db.clear_db_dags()
+        db.clear_db_jobs()
+        db.clear_db_runs()
+        db.clear_db_task_fail()
+
+    _clean_up()
+    yield
+
+
+@pytest.fixture
+def dag_maker(request):
+
+    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+    def create_dag_ti_dr(
+        dag_id='test_dag', task_id='op1', python_callable=None, py_kwargs=None, with_dagbag=False, **kwargs
+    ):
+        # If python_callable we create a PythonOperator task
+        # For ease of use, set start date to "DEFAULT_DATE" module variable from request
+        if "start_date" not in kwargs and hasattr(request.module, 'DEFAULT_DATE'):
+            kwargs['start_date'] = getattr(request.module, 'DEFAULT_DATE')
+        else:
+            kwargs['start_date'] = DEFAULT_DATE
+        if with_dagbag:
+            dagbag = DagBag(
+                dag_folder=TEST_DAG_FOLDER,
+                include_examples=False,
+            )
+            dag = dagbag.get_dag(dag_id)
+            task = dag.get_task(task_id)
+        else:
+            dag = DAG(dag_id, start_date=kwargs['start_date'])
+            if python_callable:
+                task = PythonOperator(task_id=task_id, python_callable=python_callable, dag=dag, **py_kwargs)
+            else:
+                task = DummyOperator(task_id=task_id, run_as_user=kwargs.get("run_as_user", None), dag=dag)

Review comment:
       Good. Thanks for all the reviews!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org