You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/28 05:25:22 UTC

[airflow] branch master updated: Add query count test for LocalTaskJob (#8922)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 369e637  Add query count test for LocalTaskJob (#8922)
369e637 is described below

commit 369e6377b4c9a267c85c82211fd72282092b6fa8
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Thu May 28 07:24:46 2020 +0200

    Add query count test for LocalTaskJob (#8922)
    
    * Add query count test for LocalTaskJob
    
    * fixup! Add query count test for LocalTaskJob
---
 tests/jobs/test_local_task_job.py | 30 ++++++++++++++++++++++++++++++
 1 file changed, 30 insertions(+)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 25a181a..9a706ed 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -20,6 +20,8 @@ import multiprocessing
 import os
 import time
 import unittest
+import uuid
+from unittest import mock
 
 import pytest
 from mock import patch
@@ -38,6 +40,7 @@ from airflow.utils.net import get_hostname
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
+from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.db import clear_db_runs
 from tests.test_utils.mock_executor import MockExecutor
 
@@ -406,3 +409,30 @@ class TestLocalTaskJob(unittest.TestCase):
         self.assertTrue(data['called'])
         process.join(timeout=10)
         self.assertFalse(process.is_alive())
+
+
+@pytest.fixture()
+def clean_db_helper():
+    yield
+    clear_db_runs()
+
+
+@pytest.mark.usefixtures("clean_db_helper")
+class TestLocalTaskJobPerformance:
+    @pytest.mark.parametrize("return_codes", [[0], 9 * [None] + [0]])  # type: ignore
+    @mock.patch("airflow.jobs.local_task_job.get_task_runner")
+    def test_number_of_queries_single_loop(self, mock_get_task_runner, return_codes):
+        unique_prefix = str(uuid.uuid4())
+        dag = DAG(dag_id=f'{unique_prefix}_test_number_of_queries', start_date=DEFAULT_DATE)
+        task = DummyOperator(task_id='test_state_succeeded1', dag=dag)
+
+        dag.clear()
+        dag.create_dagrun(run_id=unique_prefix, state=State.NONE)
+
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+
+        mock_get_task_runner.return_value.return_code.side_effects = return_codes
+
+        job = LocalTaskJob(task_instance=ti, executor=MockExecutor())
+        with assert_queries_count(13):
+            job.run()