You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/31 17:40:10 UTC
[airflow] branch master updated: Test queries when number of active
DAG Run is not zero (#9082)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 93b8f3e Test queries when number of active DAG Run is not zero (#9082)
93b8f3e is described below
commit 93b8f3e48df0a1aaa7be4f83904f4208496ec7db
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Sun May 31 19:39:22 2020 +0200
Test queries when number of active DAG Run is not zero (#9082)
---
tests/jobs/test_scheduler_job.py | 107 ++++++++++++++++++++-------------------
1 file changed, 55 insertions(+), 52 deletions(-)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a8bfd69..ea60f83 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1255,40 +1255,40 @@ class TestDagFileProcessorQueriesCount(unittest.TestCase):
# pylint: disable=bad-whitespace
# expected, dag_count, task_count, start_ago, schedule_interval, shape
# One DAG with one task per DAG file
- ( 1, 1, 1, "1d", "None", "no_structure"), # noqa
- ( 1, 1, 1, "1d", "None", "linear"), # noqa
- ( 9, 1, 1, "1d", "@once", "no_structure"), # noqa
- ( 9, 1, 1, "1d", "@once", "linear"), # noqa
- ( 9, 1, 1, "1d", "30m", "no_structure"), # noqa
- ( 9, 1, 1, "1d", "30m", "linear"), # noqa
- ( 9, 1, 1, "1d", "30m", "binary_tree"), # noqa
- ( 9, 1, 1, "1d", "30m", "star"), # noqa
- ( 9, 1, 1, "1d", "30m", "grid"), # noqa
+ ([ 1, 1, 1, 1], 1, 1, "1d", "None", "no_structure"), # noqa
+ ([ 1, 1, 1, 1], 1, 1, "1d", "None", "linear"), # noqa
+ ([ 9, 5, 5, 5], 1, 1, "1d", "@once", "no_structure"), # noqa
+ ([ 9, 5, 5, 5], 1, 1, "1d", "@once", "linear"), # noqa
+ ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "no_structure"), # noqa
+ ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "linear"), # noqa
+ ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "binary_tree"), # noqa
+ ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "star"), # noqa
+ ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "grid"), # noqa
# One DAG with five tasks per DAG file
- ( 1, 1, 5, "1d", "None", "no_structure"), # noqa
- ( 1, 1, 5, "1d", "None", "linear"), # noqa
- ( 9, 1, 5, "1d", "@once", "no_structure"), # noqa
- (10, 1, 5, "1d", "@once", "linear"), # noqa
- ( 9, 1, 5, "1d", "30m", "no_structure"), # noqa
- (10, 1, 5, "1d", "30m", "linear"), # noqa
- (10, 1, 5, "1d", "30m", "binary_tree"), # noqa
- (10, 1, 5, "1d", "30m", "star"), # noqa
- (10, 1, 5, "1d", "30m", "grid"), # noqa
+ ([ 1, 1, 1, 1], 1, 5, "1d", "None", "no_structure"), # noqa
+ ([ 1, 1, 1, 1], 1, 5, "1d", "None", "linear"), # noqa
+ ([ 9, 5, 5, 5], 1, 5, "1d", "@once", "no_structure"), # noqa
+ ([10, 6, 6, 6], 1, 5, "1d", "@once", "linear"), # noqa
+ ([ 9, 12, 15, 18], 1, 5, "1d", "30m", "no_structure"), # noqa
+ ([10, 14, 18, 22], 1, 5, "1d", "30m", "linear"), # noqa
+ ([10, 14, 18, 22], 1, 5, "1d", "30m", "binary_tree"), # noqa
+ ([10, 14, 18, 22], 1, 5, "1d", "30m", "star"), # noqa
+ ([10, 14, 18, 22], 1, 5, "1d", "30m", "grid"), # noqa
# 10 DAGs with 10 tasks per DAG file
- ( 1, 10, 10, "1d", "None", "no_structure"), # noqa
- ( 1, 10, 10, "1d", "None", "linear"), # noqa
- (81, 10, 10, "1d", "@once", "no_structure"), # noqa
- (91, 10, 10, "1d", "@once", "linear"), # noqa
- (81, 10, 10, "1d", "30m", "no_structure"), # noqa
- (91, 10, 10, "1d", "30m", "linear"), # noqa
- (91, 10, 10, "1d", "30m", "binary_tree"), # noqa
- (91, 10, 10, "1d", "30m", "star"), # noqa
- (91, 10, 10, "1d", "30m", "grid"), # noqa
+ ([ 1, 1, 1, 1], 10, 10, "1d", "None", "no_structure"), # noqa
+ ([ 1, 1, 1, 1], 10, 10, "1d", "None", "linear"), # noqa
+ ([81, 41, 41, 41], 10, 10, "1d", "@once", "no_structure"), # noqa
+ ([91, 51, 51, 51], 10, 10, "1d", "@once", "linear"), # noqa
+ ([81, 111, 111, 111], 10, 10, "1d", "30m", "no_structure"), # noqa
+ ([91, 131, 131, 131], 10, 10, "1d", "30m", "linear"), # noqa
+ ([91, 131, 131, 131], 10, 10, "1d", "30m", "binary_tree"), # noqa
+ ([91, 131, 131, 131], 10, 10, "1d", "30m", "star"), # noqa
+ ([91, 131, 131, 131], 10, 10, "1d", "30m", "grid"), # noqa
# pylint: enable=bad-whitespace
]
)
def test_process_dags_queries_count(
- self, expected_query_count, dag_count, task_count, start_ago, schedule_interval, shape
+ self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
):
with mock.patch.dict("os.environ", {
"PERF_DAGS_COUNT": str(dag_count),
@@ -1300,42 +1300,43 @@ class TestDagFileProcessorQueriesCount(unittest.TestCase):
('scheduler', 'use_job_schedule'): 'True',
}):
dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
- with assert_queries_count(expected_query_count):
- processor = DagFileProcessor([], mock.MagicMock())
- processor._process_dags(dagbag.dags.values())
+ processor = DagFileProcessor([], mock.MagicMock())
+ for expected_query_count in expected_query_counts:
+ with assert_queries_count(expected_query_count):
+ processor._process_dags(dagbag.dags.values())
@parameterized.expand(
[
# pylint: disable=bad-whitespace
# expected, dag_count, task_count, start_ago, schedule_interval, shape
# One DAG with two tasks per DAG file
- ( 5, 1, 1, "1d", "None", "no_structure"), # noqa
- ( 5, 1, 1, "1d", "None", "linear"), # noqa
- (15, 1, 1, "1d", "@once", "no_structure"), # noqa
- (15, 1, 1, "1d", "@once", "linear"), # noqa
- (15, 1, 1, "1d", "30m", "no_structure"), # noqa
- (15, 1, 1, "1d", "30m", "linear"), # noqa
+ ([ 5, 5, 5, 5], 1, 1, "1d", "None", "no_structure"), # noqa
+ ([ 5, 5, 5, 5], 1, 1, "1d", "None", "linear"), # noqa
+ ([15, 9, 9, 9], 1, 1, "1d", "@once", "no_structure"), # noqa
+ ([15, 9, 9, 9], 1, 1, "1d", "@once", "linear"), # noqa
+ ([15, 18, 21, 24], 1, 1, "1d", "30m", "no_structure"), # noqa
+ ([15, 18, 21, 24], 1, 1, "1d", "30m", "linear"), # noqa
# One DAG with five tasks per DAG file
- ( 5, 1, 5, "1d", "None", "no_structure"), # noqa
- ( 5, 1, 5, "1d", "None", "linear"), # noqa
- (15, 1, 5, "1d", "@once", "no_structure"), # noqa
- (16, 1, 5, "1d", "@once", "linear"), # noqa
- (15, 1, 5, "1d", "30m", "no_structure"), # noqa
- (16, 1, 5, "1d", "30m", "linear"), # noqa
+ ([ 5, 5, 5, 5], 1, 5, "1d", "None", "no_structure"), # noqa
+ ([ 5, 5, 5, 5], 1, 5, "1d", "None", "linear"), # noqa
+ ([15, 9, 9, 9], 1, 5, "1d", "@once", "no_structure"), # noqa
+ ([16, 10, 10, 10], 1, 5, "1d", "@once", "linear"), # noqa
+ ([15, 18, 21, 24], 1, 5, "1d", "30m", "no_structure"), # noqa
+ ([16, 20, 24, 28], 1, 5, "1d", "30m", "linear"), # noqa
# 10 DAGs with 10 tasks per DAG file
- ( 5, 10, 10, "1d", "None", "no_structure"), # noqa
- ( 5, 10, 10, "1d", "None", "linear"), # noqa
- (87, 10, 10, "1d", "@once", "no_structure"), # noqa
- (97, 10, 10, "1d", "@once", "linear"), # noqa
- (87, 10, 10, "1d", "30m", "no_structure"), # noqa
- (97, 10, 10, "1d", "30m", "linear"), # noqa
+ ([ 5, 5, 5, 5], 10, 10, "1d", "None", "no_structure"), # noqa
+ ([ 5, 5, 5, 5], 10, 10, "1d", "None", "linear"), # noqa
+ ([87, 45, 45, 45], 10, 10, "1d", "@once", "no_structure"), # noqa
+ ([97, 55, 55, 55], 10, 10, "1d", "@once", "linear"), # noqa
+ ([87, 117, 117, 117], 10, 10, "1d", "30m", "no_structure"), # noqa
+ ([97, 137, 137, 137], 10, 10, "1d", "30m", "linear"), # noqa
# pylint: enable=bad-whitespace
]
)
def test_process_file_queries_count(
- self, expected_query_count, dag_count, task_count, start_ago, schedule_interval, shape
+ self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
):
- with assert_queries_count(expected_query_count), mock.patch.dict("os.environ", {
+ with mock.patch.dict("os.environ", {
"PERF_DAGS_COUNT": str(dag_count),
"PERF_TASKS_COUNT": str(task_count),
"PERF_START_AGO": start_ago,
@@ -1345,7 +1346,9 @@ class TestDagFileProcessorQueriesCount(unittest.TestCase):
('scheduler', 'use_job_schedule'): 'True'
}):
processor = DagFileProcessor([], mock.MagicMock())
- processor.process_file(ELASTIC_DAG_FILE, [])
+ for expected_query_count in expected_query_counts:
+ with assert_queries_count(expected_query_count):
+ processor.process_file(ELASTIC_DAG_FILE, [])
@pytest.mark.usefixtures("disable_load_example")