You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (JIRA)" <ji...@apache.org> on 2019/04/29 21:58:00 UTC
[jira] [Resolved] (AIRFLOW-4358) Decouple running actual tasks from
test_jobs.py
[ https://issues.apache.org/jira/browse/AIRFLOW-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ash Berlin-Taylor resolved AIRFLOW-4358.
----------------------------------------
Resolution: Fixed
Fix Version/s: 1.10.4
> Decouple running actual tasks from test_jobs.py
> -----------------------------------------------
>
> Key: AIRFLOW-4358
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4358
> Project: Apache Airflow
> Issue Type: Sub-task
> Components: tests
> Reporter: Ash Berlin-Taylor
> Assignee: Ash Berlin-Taylor
> Priority: Major
> Fix For: 1.10.4
>
>
> In tests/test_jobs.py we have a lot of slow, fragile tests that test the behaviour of the "Jobs" - BackfillJob and SchedulerJob being the main two.
> Since we have tests that check that the executors will actually _run_ tasks we should "stub" out the executor and replace the executor in those tests with one that just records what tasks it was asked to run.
> This will hopefully speed up the tests and make them less fragile.
> A possible starting point:
> {code:title=test_jobs.diff}
> diff --git a/tests/test_jobs.py b/tests/test_jobs.py
> index e3ae8be1..aa919f90 100644
> --- a/tests/test_jobs.py
> +++ b/tests/test_jobs.py
> @@ -1384,6 +1384,14 @@ class LocalTaskJobTest(unittest.TestCase):
> class SchedulerJobTest(unittest.TestCase):
> + class NullExecutor(airflow.executors.base_executor.BaseExecutor):
> + def heartbeat(self, *args, **kwargs):
> + # This would normally pop tasks of the queue and run them.
> + pass
> +
> + def end():
> + pass
> +
> def setUp(self):
> clear_db_runs()
> clear_db_pools()
> @@ -1391,6 +1399,10 @@ class SchedulerJobTest(unittest.TestCase):
> clear_db_sla_miss()
> clear_db_errors()
> + # Speed up some tests by not running the tasks, just look at what we
> + # enqueue!
> + self.null_exec = self.NullExecutor()
> +
> @classmethod
> def setUpClass(cls):
> cls.dagbag = DagBag()
> @@ -1409,8 +1421,7 @@ class SchedulerJobTest(unittest.TestCase):
> def tearDownClass(cls):
> cls.patcher.stop()
> - @staticmethod
> - def run_single_scheduler_loop_with_no_dags(dags_folder):
> + def run_single_scheduler_loop_with_no_dags(self, dags_folder):
> """
> Utility function that runs a single scheduler loop without actually
> changing/scheduling any dags. This is useful to simulate the other side effects of
> @@ -1421,6 +1432,7 @@ class SchedulerJobTest(unittest.TestCase):
> :type directory: str
> """
> scheduler = SchedulerJob(
> + executor=self.null_exec,
> dag_id='this_dag_doesnt_exist', # We don't want to actually run anything
> num_runs=1,
> subdir=os.path.join(dags_folder))
> @@ -2483,6 +2495,7 @@ class SchedulerJobTest(unittest.TestCase):
> self.assertTrue(dag.start_date > datetime.datetime.utcnow())
> scheduler = SchedulerJob(dag_id,
> + executor=self.null_exec,
> subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
> num_runs=1)
> scheduler.run()
> @@ -2491,6 +2504,7 @@ class SchedulerJobTest(unittest.TestCase):
> self.assertEqual(
> len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)
> session.commit()
> + self.assertEqual({}, self.null_exec.queued_tasks)
> # previously, running this backfill would kick off the Scheduler
> # because it would take the most recent run and start from there
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)