You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/08 09:47:09 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

uranusjr commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r684746710



##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -56,47 +54,48 @@
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
-class TestBackfillJob(unittest.TestCase):
-    def _get_dummy_dag(self, dag_id, pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None):
-        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(task_id='op', pool=pool, task_concurrency=task_concurrency, dag=dag)
-
-        dag.clear()
-        return dag
-
-    def _times_called_with(self, method, class_):
-        count = 0
-        for args in method.call_args_list:
-            if isinstance(args[0][0], class_):
-                count += 1
-        return count
-
-    @classmethod
-    def setUpClass(cls):
-        cls.dagbag = DagBag(include_examples=True)
-
+@pytest.fixture(scope="module")
+def dag_bag():
+    return DagBag(include_examples=True)
+  
+  
+class TestBackfillJob:
     @staticmethod
     def clean_db():
         clear_db_dags()
         clear_db_runs()
         clear_db_pools()
 
-    def setUp(self):
+    @pytest.fixture(autouse=True)
+    def set_instance_attrs(self, dag_bag):
         self.clean_db()
         self.parser = cli_parser.get_parser()
+        self.dagbag = dag_bag
+        
+    def _get_dummy_dag(
+        self,
+        dag_maker_fixture,
+        dag_id='test_dag',
+        pool=Pool.DEFAULT_POOL_NAME,
+        task_concurrency=None,
+        task_id='op',
+        **kwargs,
+    ):
+        with dag_maker_fixture(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag:
+            DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency)
 
-    def tearDown(self) -> None:
-        self.clean_db()
+        return dag
 
-    def test_unfinished_dag_runs_set_to_failed(self):
-        dag = self._get_dummy_dag('dummy_dag')
+    def _times_called_with(self, method, class_):
+        count = 0
+        for args in method.call_args_list:
+            if isinstance(args[0][0], class_):
+                count += 1
+        return count

Review comment:
       ```suggestion
           return sum(1 for args, _ in method.call_args_list if isinstance(args[0], class_))
   ```
   
   A common trick :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org