You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/05/28 05:25:22 UTC
[airflow] branch master updated: Add query count test for
LocalTaskJob (#8922)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 369e637 Add query count test for LocalTaskJob (#8922)
369e637 is described below
commit 369e6377b4c9a267c85c82211fd72282092b6fa8
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Thu May 28 07:24:46 2020 +0200
Add query count test for LocalTaskJob (#8922)
* Add query count test for LocalTaskJob
* fixup! Add query count test for LocalTaskJob
---
tests/jobs/test_local_task_job.py | 30 ++++++++++++++++++++++++++++++
1 file changed, 30 insertions(+)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 25a181a..9a706ed 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -20,6 +20,8 @@ import multiprocessing
import os
import time
import unittest
+import uuid
+from unittest import mock
import pytest
from mock import patch
@@ -38,6 +40,7 @@ from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
+from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.db import clear_db_runs
from tests.test_utils.mock_executor import MockExecutor
@@ -406,3 +409,30 @@ class TestLocalTaskJob(unittest.TestCase):
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())
+
+
+@pytest.fixture()
+def clean_db_helper():
+ yield
+ clear_db_runs()
+
+
+@pytest.mark.usefixtures("clean_db_helper")
+class TestLocalTaskJobPerformance:
+ @pytest.mark.parametrize("return_codes", [[0], 9 * [None] + [0]]) # type: ignore
+ @mock.patch("airflow.jobs.local_task_job.get_task_runner")
+ def test_number_of_queries_single_loop(self, mock_get_task_runner, return_codes):
+ unique_prefix = str(uuid.uuid4())
+ dag = DAG(dag_id=f'{unique_prefix}_test_number_of_queries', start_date=DEFAULT_DATE)
+ task = DummyOperator(task_id='test_state_succeeded1', dag=dag)
+
+ dag.clear()
+ dag.create_dagrun(run_id=unique_prefix, state=State.NONE)
+
+ ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+
+ mock_get_task_runner.return_value.return_code.side_effects = return_codes
+
+ job = LocalTaskJob(task_instance=ti, executor=MockExecutor())
+ with assert_queries_count(13):
+ job.run()