You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/22 19:53:45 UTC

[airflow] branch master updated: Move perf_kit to tests.utils (#10470)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ee7d7c  Move perf_kit to tests.utils (#10470)
7ee7d7c is described below

commit 7ee7d7cf3f276e578463f0213b6c832027dbb9c3
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Sat Aug 22 21:53:07 2020 +0200

    Move perf_kit to tests.utils (#10470)
    
    Perf_kit was a separate folder and it was a problem when we tried to
    build it from Docker-embedded sources, because there was a hidden,
    implicit dependency between tests (conftest) and perf.
    
    Perf_kit is now moved to tests to be avaiilable in the CI image
    also when we run tests without the sources mounted.
    This is changing back in #10441 and we need to move perf_kit
    for it to work.
---
 scripts/ci/docker-compose/local.yml                |  1 -
 scripts/ci/libraries/_local_mounts.sh              |  1 -
 tests/conftest.py                                  |  7 +--
 tests/jobs/test_scheduler_job.py                   |  2 +-
 {scripts => tests/utils}/perf/dags/elastic_dag.py  |  0
 {scripts => tests/utils}/perf/dags/perf_dag_1.py   |  0
 {scripts => tests/utils}/perf/dags/perf_dag_2.py   |  0
 {scripts => tests/utils}/perf/dags/sql_perf_dag.py |  0
 {scripts => tests/utils}/perf/perf_kit/__init__.py | 23 ++++----
 {scripts => tests/utils}/perf/perf_kit/memory.py   |  1 +
 {scripts => tests/utils}/perf/perf_kit/python.py   | 22 ++++---
 .../utils}/perf/perf_kit/repeat_and_time.py        | 26 +++++----
 .../utils}/perf/perf_kit/sqlalchemy.py             | 68 ++++++++++++++++++++--
 .../utils}/perf/scheduler_dag_execution_timing.py  | 14 ++---
 .../utils}/perf/scheduler_ops_metrics.py           |  0
 {scripts => tests/utils}/perf/sql_queries.py       |  0
 16 files changed, 112 insertions(+), 53 deletions(-)

diff --git a/scripts/ci/docker-compose/local.yml b/scripts/ci/docker-compose/local.yml
index a6c8d7e..619156f 100644
--- a/scripts/ci/docker-compose/local.yml
+++ b/scripts/ci/docker-compose/local.yml
@@ -50,7 +50,6 @@ services:
       - ../../../pylintrc:/opt/airflow/pylintrc:cached
       - ../../../pytest.ini:/opt/airflow/pytest.ini:cached
       - ../../../scripts:/opt/airflow/scripts:cached
-      - ../../../scripts/perf:/opt/airflow/scripts/perf:cached
       - ../../../scripts/in_container/entrypoint_ci.sh:/entrypoint:cached
       - ../../../setup.cfg:/opt/airflow/setup.cfg:cached
       - ../../../setup.py:/opt/airflow/setup.py:cached
diff --git a/scripts/ci/libraries/_local_mounts.sh b/scripts/ci/libraries/_local_mounts.sh
index 6398902..4300af9 100644
--- a/scripts/ci/libraries/_local_mounts.sh
+++ b/scripts/ci/libraries/_local_mounts.sh
@@ -46,7 +46,6 @@ function generate_local_mounts_list {
         "$prefix"pylintrc:/opt/airflow/pylintrc:cached
         "$prefix"pytest.ini:/opt/airflow/pytest.ini:cached
         "$prefix"scripts:/opt/airflow/scripts:cached
-        "$prefix"scripts/perf:/opt/airflow/scripts/perf:cached
         "$prefix"scripts/in_container/entrypoint_ci.sh:/entrypoint:cached
         "$prefix"setup.cfg:/opt/airflow/setup.cfg:cached
         "$prefix"setup.py:/opt/airflow/setup.py:cached
diff --git a/tests/conftest.py b/tests/conftest.py
index 5414e84..eac88d6 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -33,12 +33,7 @@ os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
 os.environ["AWS_DEFAULT_REGION"] = (os.environ.get("AWS_DEFAULT_REGION") or "us-east-1")
 os.environ["CREDENTIALS_DIR"] = (os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys")
 
-perf_directory = os.path.abspath(os.path.join(tests_directory, os.pardir, 'scripts', 'perf'))
-if perf_directory not in sys.path:
-    sys.path.append(perf_directory)
-
-
-from perf_kit.sqlalchemy import (  # noqa: E402 isort:skip # pylint: disable=wrong-import-position
+from tests.utils.perf.perf_kit.sqlalchemy import (  # noqa isort:skip # pylint: disable=wrong-import-position
     count_queries, trace_queries
 )
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 0020ead..49e0ee8 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -63,7 +63,7 @@ from tests.test_utils.mock_executor import MockExecutor
 ROOT_FOLDER = os.path.realpath(
     os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
 )
-PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "scripts", "perf", "dags")
+PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "tests", "utils", "perf", "dags")
 ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py")
 
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
diff --git a/scripts/perf/dags/elastic_dag.py b/tests/utils/perf/dags/elastic_dag.py
similarity index 100%
rename from scripts/perf/dags/elastic_dag.py
rename to tests/utils/perf/dags/elastic_dag.py
diff --git a/scripts/perf/dags/perf_dag_1.py b/tests/utils/perf/dags/perf_dag_1.py
similarity index 100%
rename from scripts/perf/dags/perf_dag_1.py
rename to tests/utils/perf/dags/perf_dag_1.py
diff --git a/scripts/perf/dags/perf_dag_2.py b/tests/utils/perf/dags/perf_dag_2.py
similarity index 100%
rename from scripts/perf/dags/perf_dag_2.py
rename to tests/utils/perf/dags/perf_dag_2.py
diff --git a/scripts/perf/dags/sql_perf_dag.py b/tests/utils/perf/dags/sql_perf_dag.py
similarity index 100%
rename from scripts/perf/dags/sql_perf_dag.py
rename to tests/utils/perf/dags/sql_perf_dag.py
diff --git a/scripts/perf/perf_kit/__init__.py b/tests/utils/perf/perf_kit/__init__.py
similarity index 81%
rename from scripts/perf/perf_kit/__init__.py
rename to tests/utils/perf/perf_kit/__init__.py
index 7c79c35..d9a2c48 100644
--- a/scripts/perf/perf_kit/__init__.py
+++ b/tests/utils/perf/perf_kit/__init__.py
@@ -32,21 +32,21 @@ Content
 =======
 The following decorators and context managers are included.
 
-.. autofunction:: perf_kit.memory.trace_memory
+.. autofunction:: tests.utils.perf.perf_kit.memory.trace_memory
 
-.. autofunction:: perf_kit.python.pyspy
+.. autofunction:: tests.utils.perf.perf_kit.python.pyspy
 
-.. autofunction:: perf_kit.python.profiled
+.. autofunction:: tests.utils.perf.perf_kit.python.profiled
 
-.. autofunction:: perf_kit.repeat_and_time.timing
+.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timing
 
-.. autofunction:: perf_kit.repeat_and_time.repeat
+.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.repeat
 
-.. autofunction:: perf_kit.repeat_and_time.timeout
+.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timeout
 
-.. autofunction:: perf_kit.sqlalchemy.trace_queries
+.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.trace_queries
 
-.. autofunction:: perf_kit.sqlalchemy.count_queries
+.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.count_queries
 
 Documentation for each function is provided in the function docstrings. Each module also has an example in
 the main section of the module.
@@ -54,11 +54,12 @@ the main section of the module.
 Examples
 ========
 
-If you want to run an all example for ``perf_kit.sqlalchemy``, you can run the following command.
+If you want to run an all example for ``tests.utils.perf.perf_kit.sqlalchemy``, you can run the
+following command.
 
 .. code-block:: bash
 
-    python -m perf_kit.sqlalchemy
+    python -m tests.utils.perf_kit.sqlalchemy
 
 If you want to know how to use these functions, it is worth to familiarize yourself with these examples.
 
@@ -98,7 +99,7 @@ queries in it.
         self.assertEqual(prev_local.isoformat(), "2018-03-24T03:00:00+01:00")
         self.assertEqual(prev.isoformat(), "2018-03-24T02:00:00+00:00")
 
-    from perf_kit.sqlalchemy import trace_queries
+    from tests.utils.perf.perf_kit.sqlalchemy import trace_queries
 
     @trace_queries
     def test_bulk_sync_to_db(self):
diff --git a/scripts/perf/perf_kit/memory.py b/tests/utils/perf/perf_kit/memory.py
similarity index 98%
rename from scripts/perf/perf_kit/memory.py
rename to tests/utils/perf/perf_kit/memory.py
index ba3576e..f84c505 100644
--- a/scripts/perf/perf_kit/memory.py
+++ b/tests/utils/perf/perf_kit/memory.py
@@ -35,6 +35,7 @@ def _human_readable_size(size, decimal_places=3):
 
 
 class TraceMemoryResult:
+    """Trace results of memory,"""
     def __init__(self):
         self.before = 0
         self.after = 0
diff --git a/scripts/perf/perf_kit/python.py b/tests/utils/perf/perf_kit/python.py
similarity index 86%
rename from scripts/perf/perf_kit/python.py
rename to tests/utils/perf/perf_kit/python.py
index 36a0b11..3169e9c 100644
--- a/scripts/perf/perf_kit/python.py
+++ b/tests/utils/perf/perf_kit/python.py
@@ -45,7 +45,7 @@ def pyspy():
           cap_add:
           - SYS_PTRACE
 
-    In the case of Airflow Breeze, you should modify the ``scripts/perf/perf_kit/python.py`` file.
+    In the case of Airflow Breeze, you should modify the ``tests/utils/perf/perf_kit/python.py`` file.
     """
     pid = str(os.getpid())
     suffix = datetime.datetime.now().isoformat()
@@ -66,24 +66,28 @@ def profiled(print_callers=False):
     This decorator provide deterministic profiling. It uses ``cProfile`` internally.  It generates statistic
     and print on the screen.
     """
-    pr = cProfile.Profile()
-    pr.enable()
+    profile = cProfile.Profile()
+    profile.enable()
     try:
         yield
     finally:
-        pr.disable()
-        s = io.StringIO()
-        ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
+        profile.disable()
+        stat = io.StringIO()
+        pstatistics = pstats.Stats(profile, stream=stat).sort_stats("cumulative")
         if print_callers:
-            ps.print_callers()
+            pstatistics.print_callers()
         else:
-            ps.print_stats()
-        print(s.getvalue())
+            pstatistics.print_stats()
+        print(stat.getvalue())
 
 
 if __name__ == "__main__":
 
     def case():
+        """
+        Load modules.
+        :return:
+        """
         import logging
 
         import airflow
diff --git a/scripts/perf/perf_kit/repeat_and_time.py b/tests/utils/perf/perf_kit/repeat_and_time.py
similarity index 88%
rename from scripts/perf/perf_kit/repeat_and_time.py
rename to tests/utils/perf/perf_kit/repeat_and_time.py
index 519f802..8efd7f1 100644
--- a/scripts/perf/perf_kit/repeat_and_time.py
+++ b/tests/utils/perf/perf_kit/repeat_and_time.py
@@ -23,6 +23,7 @@ import time
 
 
 class TimingResult:
+    """Timing result."""
     def __init__(self):
         self.start_time = 0
         self.end_time = 0
@@ -65,7 +66,7 @@ def repeat(repeat_count=5):
         @functools.wraps(f)
         def wrap(*args, **kwargs):
             last_result = None
-            for i in range(repeat_count):
+            for _ in range(repeat_count):
                 last_result = f(*args, **kwargs)
             return last_result
 
@@ -75,7 +76,7 @@ def repeat(repeat_count=5):
 
 
 class TimeoutException(Exception):
-    pass
+    """Exception when the test timeo uts"""
 
 
 @contextlib.contextmanager
@@ -109,13 +110,13 @@ def timeout(seconds=1):
 if __name__ == "__main__":
 
     def monte_carlo(total=10000):
-        # Monte Carlo
+        """Monte Carlo"""
         inside = 0
 
-        for i in range(0, total):
-            x2 = random.random() ** 2
-            y2 = random.random() ** 2
-            if math.sqrt(x2 + y2) < 1.0:
+        for _ in range(0, total):
+            x_val = random.random() ** 2
+            y_val = random.random() ** 2
+            if math.sqrt(x_val + y_val) < 1.0:
                 inside += 1
 
         return (float(inside) / total) * 4
@@ -134,15 +135,16 @@ if __name__ == "__main__":
     @timing(REPEAT_COUNT)
     @repeat(REPEAT_COUNT)
     @timing()
-    def pi():
+    def get_pi():
+        """Returns PI value:"""
         return monte_carlo()
 
-    result = pi()
-    print("PI: ", result)
+    res = get_pi()
+    print("PI: ", res)
     print()
 
     # Example 3:
     with timing():
-        result = monte_carlo()
+        res = monte_carlo()
 
-    print("PI: ", result)
+    print("PI: ", res)
diff --git a/scripts/perf/perf_kit/sqlalchemy.py b/tests/utils/perf/perf_kit/sqlalchemy.py
similarity index 69%
rename from scripts/perf/perf_kit/sqlalchemy.py
rename to tests/utils/perf/perf_kit/sqlalchemy.py
index 72ab7a3..eb7f72b 100644
--- a/scripts/perf/perf_kit/sqlalchemy.py
+++ b/tests/utils/perf/perf_kit/sqlalchemy.py
@@ -32,6 +32,7 @@ def _pretty_format_sql(text: str):
     return text
 
 
+# noinspection PyUnusedLocal
 class TraceQueries:
     """
     Tracking SQL queries in a code block.
@@ -61,11 +62,46 @@ class TraceQueries:
         self.print_fn = print_fn
         self.query_count = 0
 
-    def before_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
+    def before_cursor_execute(self,
+                              conn,
+                              cursor,  # pylint: disable=unused-argument
+                              statement,  # pylint: disable=unused-argument
+                              parameters,  # pylint: disable=unused-argument
+                              context,  # pylint: disable=unused-argument
+                              executemany):  # pylint: disable=unused-argument
+        """
+        Executed before cursor.
+
+        :param conn:  connection
+        :param cursor:  cursor
+        :param statement: statement
+        :param parameters: parameters
+        :param context: context
+        :param executemany: whether many statements executed
+        :return:
+        """
+
         conn.info.setdefault("query_start_time", []).append(time.monotonic())
         self.query_count += 1
 
-    def after_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
+    def after_cursor_execute(self,
+                             conn,
+                             cursor,  # pylint: disable=unused-argument
+                             statement,
+                             parameters,
+                             context,  # pylint: disable=unused-argument
+                             executemany):  # pylint: disable=unused-argument
+        """
+        Executed after cursor.
+
+        :param conn:  connection
+        :param cursor:  cursor
+        :param statement: statement
+        :param parameters: parameters
+        :param context: context
+        :param executemany: whether many statements executed
+        :return:
+        """
         total = time.monotonic() - conn.info["query_start_time"].pop()
         file_names = [
             f"{f.filename}:{f.name}:{f.lineno}"
@@ -102,7 +138,8 @@ class TraceQueries:
         event.listen(airflow.settings.engine, "before_cursor_execute", self.before_cursor_execute)
         event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
 
-    def __exit__(self, type_, value, traceback):
+    # noinspection PyShadowingNames
+    def __exit__(self, type_, value, traceback):  # pylint: disable=redefined-outer-name
         import airflow.settings
         event.remove(airflow.settings.engine, "before_cursor_execute", self.before_cursor_execute)
         event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
@@ -112,6 +149,9 @@ trace_queries = TraceQueries  # pylint: disable=invalid-name
 
 
 class CountQueriesResult:
+    """
+    Counter for number of queries.
+    """
     def __init__(self):
         self.count = 0
 
@@ -136,13 +176,30 @@ class CountQueries:
         event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
         return self.result
 
-    def __exit__(self, type_, value, traceback):
+    # noinspection PyShadowingNames
+    def __exit__(self, type_, value, traceback):  # pylint: disable=redefined-outer-name
         import airflow.settings
 
         event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
         self.print_fn(f"Count SQL queries: {self.result.count}")
 
-    def after_cursor_execute(self, *args, **kwargs):
+    def after_cursor_execute(self,
+                             conn,  # pylint: disable=unused-argument
+                             cursor,  # pylint: disable=unused-argument
+                             statement,  # pylint: disable=unused-argument
+                             parameters,  # pylint: disable=unused-argument
+                             context,  # pylint: disable=unused-argument
+                             executemany):  # pylint: disable=unused-argument
+        """
+        Executed after cursor.
+
+        :param conn:  connection
+        :param cursor:  cursor
+        :param statement: statement
+        :param parameters: parameters
+        :param context: context
+        :param executemany: whether many statements executed
+        """
         self.result.count += 1
 
 
@@ -152,6 +209,7 @@ if __name__ == "__main__":
 
     # Example:
     def case():
+        "Case of logging om/"
         import logging
         from unittest import mock
 
diff --git a/scripts/perf/scheduler_dag_execution_timing.py b/tests/utils/perf/scheduler_dag_execution_timing.py
similarity index 97%
rename from scripts/perf/scheduler_dag_execution_timing.py
rename to tests/utils/perf/scheduler_dag_execution_timing.py
index ee177a2..6756366 100755
--- a/scripts/perf/scheduler_dag_execution_timing.py
+++ b/tests/utils/perf/scheduler_dag_execution_timing.py
@@ -101,15 +101,15 @@ def get_executor_under_test(dotted_path):
     if dotted_path == "MockExecutor":
         try:
             # Run against master and 1.10.x releases
-            from tests.test_utils.mock_executor import MockExecutor as Executor
+            from tests.test_utils.mock_executor import MockExecutor as executor
         except ImportError:
-            from tests.executors.test_executor import TestExecutor as Executor
+            from tests.executors.test_executor import TestExecutor as executor
 
     else:
-        Executor = ExecutorLoader.load_executor(dotted_path)
+        executor = ExecutorLoader.load_executor(dotted_path)
 
     # Change this to try other executors
-    class ShortCircuitExecutor(ShortCircuitExecutorMixin, Executor):
+    class ShortCircuitExecutor(ShortCircuitExecutorMixin, executor):
         """
         Placeholder class that implements the inheritance hierarchy
         """
@@ -153,16 +153,16 @@ def create_dag_runs(dag, num_runs, session):
 
     try:
         from airflow.utils.types import DagRunType
-        ID_PREFIX = f'{DagRunType.SCHEDULED.value}__'
+        id_prefix = f'{DagRunType.SCHEDULED.value}__'
     except ImportError:
         from airflow.models.dagrun import DagRun
-        ID_PREFIX = DagRun.ID_PREFIX
+        id_prefix = DagRun.ID_PREFIX  # pylint: disable=no-member
 
     next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks))
 
     for _ in range(num_runs):
         dag.create_dagrun(
-            run_id=ID_PREFIX + next_run_date.isoformat(),
+            run_id=id_prefix + next_run_date.isoformat(),
             execution_date=next_run_date,
             start_date=timezone.utcnow(),
             state=State.RUNNING,
diff --git a/scripts/perf/scheduler_ops_metrics.py b/tests/utils/perf/scheduler_ops_metrics.py
similarity index 100%
rename from scripts/perf/scheduler_ops_metrics.py
rename to tests/utils/perf/scheduler_ops_metrics.py
diff --git a/scripts/perf/sql_queries.py b/tests/utils/perf/sql_queries.py
similarity index 100%
rename from scripts/perf/sql_queries.py
rename to tests/utils/perf/sql_queries.py