You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/31 17:40:10 UTC

[airflow] branch master updated: Test queries when number of active DAG Run is not zero (#9082)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b8f3e  Test queries when number of active DAG Run is not zero (#9082)
93b8f3e is described below

commit 93b8f3e48df0a1aaa7be4f83904f4208496ec7db
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Sun May 31 19:39:22 2020 +0200

    Test queries when number of active DAG Run is not zero (#9082)
---
 tests/jobs/test_scheduler_job.py | 107 ++++++++++++++++++++-------------------
 1 file changed, 55 insertions(+), 52 deletions(-)

diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a8bfd69..ea60f83 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1255,40 +1255,40 @@ class TestDagFileProcessorQueriesCount(unittest.TestCase):
             # pylint: disable=bad-whitespace
             # expected, dag_count, task_count, start_ago, schedule_interval, shape
             # One DAG with one task per DAG file
-            ( 1,  1,  1, "1d",  "None",  "no_structure"),  # noqa
-            ( 1,  1,  1, "1d",  "None",        "linear"),  # noqa
-            ( 9,  1,  1, "1d", "@once",  "no_structure"),  # noqa
-            ( 9,  1,  1, "1d", "@once",        "linear"),  # noqa
-            ( 9,  1,  1, "1d",   "30m",  "no_structure"),  # noqa
-            ( 9,  1,  1, "1d",   "30m",        "linear"),  # noqa
-            ( 9,  1,  1, "1d",   "30m",   "binary_tree"),  # noqa
-            ( 9,  1,  1, "1d",   "30m",          "star"),  # noqa
-            ( 9,  1,  1, "1d",   "30m",          "grid"),  # noqa
+            ([ 1,   1,   1,  1],  1,  1, "1d",  "None",  "no_structure"),  # noqa
+            ([ 1,   1,   1,  1],  1,  1, "1d",  "None",        "linear"),  # noqa
+            ([ 9,   5,   5,  5],  1,  1, "1d", "@once",  "no_structure"),  # noqa
+            ([ 9,   5,   5,  5],  1,  1, "1d", "@once",        "linear"),  # noqa
+            ([ 9,  12,  15, 18],  1,  1, "1d",   "30m",  "no_structure"),  # noqa
+            ([ 9,  12,  15, 18],  1,  1, "1d",   "30m",        "linear"),  # noqa
+            ([ 9,  12,  15, 18],  1,  1, "1d",   "30m",   "binary_tree"),  # noqa
+            ([ 9,  12,  15, 18],  1,  1, "1d",   "30m",          "star"),  # noqa
+            ([ 9,  12,  15, 18],  1,  1, "1d",   "30m",          "grid"),  # noqa
             # One DAG with five tasks per DAG  file
-            ( 1,  1,  5, "1d",  "None",  "no_structure"),  # noqa
-            ( 1,  1,  5, "1d",  "None",        "linear"),  # noqa
-            ( 9,  1,  5, "1d", "@once",  "no_structure"),  # noqa
-            (10,  1,  5, "1d", "@once",        "linear"),  # noqa
-            ( 9,  1,  5, "1d",   "30m",  "no_structure"),  # noqa
-            (10,  1,  5, "1d",   "30m",        "linear"),  # noqa
-            (10,  1,  5, "1d",   "30m",   "binary_tree"),  # noqa
-            (10,  1,  5, "1d",   "30m",          "star"),  # noqa
-            (10,  1,  5, "1d",   "30m",          "grid"),  # noqa
+            ([ 1,   1,   1,  1],  1,  5, "1d",  "None",  "no_structure"),  # noqa
+            ([ 1,   1,   1,  1],  1,  5, "1d",  "None",        "linear"),  # noqa
+            ([ 9,   5,   5,  5],  1,  5, "1d", "@once",  "no_structure"),  # noqa
+            ([10,   6,   6,  6],  1,  5, "1d", "@once",        "linear"),  # noqa
+            ([ 9,  12,  15, 18],  1,  5, "1d",   "30m",  "no_structure"),  # noqa
+            ([10,  14,  18, 22],  1,  5, "1d",   "30m",        "linear"),  # noqa
+            ([10,  14,  18, 22],  1,  5, "1d",   "30m",   "binary_tree"),  # noqa
+            ([10,  14,  18, 22],  1,  5, "1d",   "30m",          "star"),  # noqa
+            ([10,  14,  18, 22],  1,  5, "1d",   "30m",          "grid"),  # noqa
             # 10 DAGs with 10 tasks per DAG file
-            ( 1, 10, 10, "1d",  "None",  "no_structure"),  # noqa
-            ( 1, 10, 10, "1d",  "None",        "linear"),  # noqa
-            (81, 10, 10, "1d", "@once",  "no_structure"),  # noqa
-            (91, 10, 10, "1d", "@once",        "linear"),  # noqa
-            (81, 10, 10, "1d",   "30m",  "no_structure"),  # noqa
-            (91, 10, 10, "1d",   "30m",        "linear"),  # noqa
-            (91, 10, 10, "1d",   "30m",   "binary_tree"),  # noqa
-            (91, 10, 10, "1d",   "30m",          "star"),  # noqa
-            (91, 10, 10, "1d",   "30m",          "grid"),  # noqa
+            ([ 1,   1,   1,   1], 10, 10, "1d",  "None",  "no_structure"),  # noqa
+            ([ 1,   1,   1,   1], 10, 10, "1d",  "None",        "linear"),  # noqa
+            ([81,  41,  41,  41], 10, 10, "1d", "@once",  "no_structure"),  # noqa
+            ([91,  51,  51,  51], 10, 10, "1d", "@once",        "linear"),  # noqa
+            ([81, 111, 111, 111], 10, 10, "1d",   "30m",  "no_structure"),  # noqa
+            ([91, 131, 131, 131], 10, 10, "1d",   "30m",        "linear"),  # noqa
+            ([91, 131, 131, 131], 10, 10, "1d",   "30m",   "binary_tree"),  # noqa
+            ([91, 131, 131, 131], 10, 10, "1d",   "30m",          "star"),  # noqa
+            ([91, 131, 131, 131], 10, 10, "1d",   "30m",          "grid"),  # noqa
             # pylint: enable=bad-whitespace
         ]
     )
     def test_process_dags_queries_count(
-        self, expected_query_count, dag_count, task_count, start_ago, schedule_interval, shape
+        self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
     ):
         with mock.patch.dict("os.environ", {
             "PERF_DAGS_COUNT": str(dag_count),
@@ -1300,42 +1300,43 @@ class TestDagFileProcessorQueriesCount(unittest.TestCase):
             ('scheduler', 'use_job_schedule'): 'True',
         }):
             dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
-            with assert_queries_count(expected_query_count):
-                processor = DagFileProcessor([], mock.MagicMock())
-                processor._process_dags(dagbag.dags.values())
+            processor = DagFileProcessor([], mock.MagicMock())
+            for expected_query_count in expected_query_counts:
+                with assert_queries_count(expected_query_count):
+                    processor._process_dags(dagbag.dags.values())
 
     @parameterized.expand(
         [
             # pylint: disable=bad-whitespace
             # expected, dag_count, task_count, start_ago, schedule_interval, shape
             # One DAG with two tasks per DAG file
-            ( 5,  1,  1, "1d",   "None", "no_structure"),  # noqa
-            ( 5,  1,  1, "1d",   "None",       "linear"),  # noqa
-            (15,  1,  1, "1d",  "@once", "no_structure"),  # noqa
-            (15,  1,  1, "1d",  "@once",       "linear"),  # noqa
-            (15,  1,  1, "1d",    "30m", "no_structure"),  # noqa
-            (15,  1,  1, "1d",    "30m",       "linear"),  # noqa
+            ([ 5,   5,   5,   5],  1,  1, "1d",   "None", "no_structure"),  # noqa
+            ([ 5,   5,   5,   5],  1,  1, "1d",   "None",       "linear"),  # noqa
+            ([15,   9,   9,   9],  1,  1, "1d",  "@once", "no_structure"),  # noqa
+            ([15,   9,   9,   9],  1,  1, "1d",  "@once",       "linear"),  # noqa
+            ([15,  18,  21,  24],  1,  1, "1d",    "30m", "no_structure"),  # noqa
+            ([15,  18,  21,  24],  1,  1, "1d",    "30m",       "linear"),  # noqa
             # One DAG with five tasks per DAG file
-            ( 5,  1,  5, "1d",   "None", "no_structure"),  # noqa
-            ( 5,  1,  5, "1d",   "None",       "linear"),  # noqa
-            (15,  1,  5, "1d",  "@once", "no_structure"),  # noqa
-            (16,  1,  5, "1d",  "@once",       "linear"),  # noqa
-            (15,  1,  5, "1d",    "30m", "no_structure"),  # noqa
-            (16,  1,  5, "1d",    "30m",       "linear"),  # noqa
+            ([ 5,   5,   5,   5],  1,  5, "1d",   "None", "no_structure"),  # noqa
+            ([ 5,   5,   5,   5],  1,  5, "1d",   "None",       "linear"),  # noqa
+            ([15,   9,   9,   9],  1,  5, "1d",  "@once", "no_structure"),  # noqa
+            ([16,  10,  10,  10],  1,  5, "1d",  "@once",       "linear"),  # noqa
+            ([15,  18,  21,  24],  1,  5, "1d",    "30m", "no_structure"),  # noqa
+            ([16,  20,  24,  28],  1,  5, "1d",    "30m",       "linear"),  # noqa
             # 10 DAGs with 10 tasks per DAG file
-            ( 5, 10, 10, "1d",  "None",  "no_structure"),  # noqa
-            ( 5, 10, 10, "1d",  "None",        "linear"),  # noqa
-            (87, 10, 10, "1d", "@once",  "no_structure"),  # noqa
-            (97, 10, 10, "1d", "@once",        "linear"),  # noqa
-            (87, 10, 10, "1d",   "30m",  "no_structure"),  # noqa
-            (97, 10, 10, "1d",   "30m",        "linear"),  # noqa
+            ([ 5,   5,   5,   5], 10, 10, "1d",  "None",  "no_structure"),  # noqa
+            ([ 5,   5,   5,   5], 10, 10, "1d",  "None",        "linear"),  # noqa
+            ([87,  45,  45,  45], 10, 10, "1d", "@once",  "no_structure"),  # noqa
+            ([97,  55,  55,  55], 10, 10, "1d", "@once",        "linear"),  # noqa
+            ([87, 117, 117, 117], 10, 10, "1d",   "30m",  "no_structure"),  # noqa
+            ([97, 137, 137, 137], 10, 10, "1d",   "30m",        "linear"),  # noqa
             # pylint: enable=bad-whitespace
         ]
     )
     def test_process_file_queries_count(
-        self, expected_query_count, dag_count, task_count, start_ago, schedule_interval, shape
+        self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
     ):
-        with assert_queries_count(expected_query_count), mock.patch.dict("os.environ", {
+        with mock.patch.dict("os.environ", {
             "PERF_DAGS_COUNT": str(dag_count),
             "PERF_TASKS_COUNT": str(task_count),
             "PERF_START_AGO": start_ago,
@@ -1345,7 +1346,9 @@ class TestDagFileProcessorQueriesCount(unittest.TestCase):
             ('scheduler', 'use_job_schedule'): 'True'
         }):
             processor = DagFileProcessor([], mock.MagicMock())
-            processor.process_file(ELASTIC_DAG_FILE, [])
+            for expected_query_count in expected_query_counts:
+                with assert_queries_count(expected_query_count):
+                    processor.process_file(ELASTIC_DAG_FILE, [])
 
 
 @pytest.mark.usefixtures("disable_load_example")