You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/22 19:53:45 UTC
[airflow] branch master updated: Move perf_kit to tests.utils
(#10470)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 7ee7d7c Move perf_kit to tests.utils (#10470)
7ee7d7c is described below
commit 7ee7d7cf3f276e578463f0213b6c832027dbb9c3
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Sat Aug 22 21:53:07 2020 +0200
Move perf_kit to tests.utils (#10470)
Perf_kit was a separate folder and it was a problem when we tried to
build it from Docker-embedded sources, because there was a hidden,
implicit dependency between tests (conftest) and perf.
Perf_kit is now moved to tests to be avaiilable in the CI image
also when we run tests without the sources mounted.
This is changing back in #10441 and we need to move perf_kit
for it to work.
---
scripts/ci/docker-compose/local.yml | 1 -
scripts/ci/libraries/_local_mounts.sh | 1 -
tests/conftest.py | 7 +--
tests/jobs/test_scheduler_job.py | 2 +-
{scripts => tests/utils}/perf/dags/elastic_dag.py | 0
{scripts => tests/utils}/perf/dags/perf_dag_1.py | 0
{scripts => tests/utils}/perf/dags/perf_dag_2.py | 0
{scripts => tests/utils}/perf/dags/sql_perf_dag.py | 0
{scripts => tests/utils}/perf/perf_kit/__init__.py | 23 ++++----
{scripts => tests/utils}/perf/perf_kit/memory.py | 1 +
{scripts => tests/utils}/perf/perf_kit/python.py | 22 ++++---
.../utils}/perf/perf_kit/repeat_and_time.py | 26 +++++----
.../utils}/perf/perf_kit/sqlalchemy.py | 68 ++++++++++++++++++++--
.../utils}/perf/scheduler_dag_execution_timing.py | 14 ++---
.../utils}/perf/scheduler_ops_metrics.py | 0
{scripts => tests/utils}/perf/sql_queries.py | 0
16 files changed, 112 insertions(+), 53 deletions(-)
diff --git a/scripts/ci/docker-compose/local.yml b/scripts/ci/docker-compose/local.yml
index a6c8d7e..619156f 100644
--- a/scripts/ci/docker-compose/local.yml
+++ b/scripts/ci/docker-compose/local.yml
@@ -50,7 +50,6 @@ services:
- ../../../pylintrc:/opt/airflow/pylintrc:cached
- ../../../pytest.ini:/opt/airflow/pytest.ini:cached
- ../../../scripts:/opt/airflow/scripts:cached
- - ../../../scripts/perf:/opt/airflow/scripts/perf:cached
- ../../../scripts/in_container/entrypoint_ci.sh:/entrypoint:cached
- ../../../setup.cfg:/opt/airflow/setup.cfg:cached
- ../../../setup.py:/opt/airflow/setup.py:cached
diff --git a/scripts/ci/libraries/_local_mounts.sh b/scripts/ci/libraries/_local_mounts.sh
index 6398902..4300af9 100644
--- a/scripts/ci/libraries/_local_mounts.sh
+++ b/scripts/ci/libraries/_local_mounts.sh
@@ -46,7 +46,6 @@ function generate_local_mounts_list {
"$prefix"pylintrc:/opt/airflow/pylintrc:cached
"$prefix"pytest.ini:/opt/airflow/pytest.ini:cached
"$prefix"scripts:/opt/airflow/scripts:cached
- "$prefix"scripts/perf:/opt/airflow/scripts/perf:cached
"$prefix"scripts/in_container/entrypoint_ci.sh:/entrypoint:cached
"$prefix"setup.cfg:/opt/airflow/setup.cfg:cached
"$prefix"setup.py:/opt/airflow/setup.py:cached
diff --git a/tests/conftest.py b/tests/conftest.py
index 5414e84..eac88d6 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -33,12 +33,7 @@ os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AWS_DEFAULT_REGION"] = (os.environ.get("AWS_DEFAULT_REGION") or "us-east-1")
os.environ["CREDENTIALS_DIR"] = (os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys")
-perf_directory = os.path.abspath(os.path.join(tests_directory, os.pardir, 'scripts', 'perf'))
-if perf_directory not in sys.path:
- sys.path.append(perf_directory)
-
-
-from perf_kit.sqlalchemy import ( # noqa: E402 isort:skip # pylint: disable=wrong-import-position
+from tests.utils.perf.perf_kit.sqlalchemy import ( # noqa isort:skip # pylint: disable=wrong-import-position
count_queries, trace_queries
)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 0020ead..49e0ee8 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -63,7 +63,7 @@ from tests.test_utils.mock_executor import MockExecutor
ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
)
-PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "scripts", "perf", "dags")
+PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "tests", "utils", "perf", "dags")
ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py")
TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
diff --git a/scripts/perf/dags/elastic_dag.py b/tests/utils/perf/dags/elastic_dag.py
similarity index 100%
rename from scripts/perf/dags/elastic_dag.py
rename to tests/utils/perf/dags/elastic_dag.py
diff --git a/scripts/perf/dags/perf_dag_1.py b/tests/utils/perf/dags/perf_dag_1.py
similarity index 100%
rename from scripts/perf/dags/perf_dag_1.py
rename to tests/utils/perf/dags/perf_dag_1.py
diff --git a/scripts/perf/dags/perf_dag_2.py b/tests/utils/perf/dags/perf_dag_2.py
similarity index 100%
rename from scripts/perf/dags/perf_dag_2.py
rename to tests/utils/perf/dags/perf_dag_2.py
diff --git a/scripts/perf/dags/sql_perf_dag.py b/tests/utils/perf/dags/sql_perf_dag.py
similarity index 100%
rename from scripts/perf/dags/sql_perf_dag.py
rename to tests/utils/perf/dags/sql_perf_dag.py
diff --git a/scripts/perf/perf_kit/__init__.py b/tests/utils/perf/perf_kit/__init__.py
similarity index 81%
rename from scripts/perf/perf_kit/__init__.py
rename to tests/utils/perf/perf_kit/__init__.py
index 7c79c35..d9a2c48 100644
--- a/scripts/perf/perf_kit/__init__.py
+++ b/tests/utils/perf/perf_kit/__init__.py
@@ -32,21 +32,21 @@ Content
=======
The following decorators and context managers are included.
-.. autofunction:: perf_kit.memory.trace_memory
+.. autofunction:: tests.utils.perf.perf_kit.memory.trace_memory
-.. autofunction:: perf_kit.python.pyspy
+.. autofunction:: tests.utils.perf.perf_kit.python.pyspy
-.. autofunction:: perf_kit.python.profiled
+.. autofunction:: tests.utils.perf.perf_kit.python.profiled
-.. autofunction:: perf_kit.repeat_and_time.timing
+.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timing
-.. autofunction:: perf_kit.repeat_and_time.repeat
+.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.repeat
-.. autofunction:: perf_kit.repeat_and_time.timeout
+.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timeout
-.. autofunction:: perf_kit.sqlalchemy.trace_queries
+.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.trace_queries
-.. autofunction:: perf_kit.sqlalchemy.count_queries
+.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.count_queries
Documentation for each function is provided in the function docstrings. Each module also has an example in
the main section of the module.
@@ -54,11 +54,12 @@ the main section of the module.
Examples
========
-If you want to run an all example for ``perf_kit.sqlalchemy``, you can run the following command.
+If you want to run an all example for ``tests.utils.perf.perf_kit.sqlalchemy``, you can run the
+following command.
.. code-block:: bash
- python -m perf_kit.sqlalchemy
+ python -m tests.utils.perf_kit.sqlalchemy
If you want to know how to use these functions, it is worth to familiarize yourself with these examples.
@@ -98,7 +99,7 @@ queries in it.
self.assertEqual(prev_local.isoformat(), "2018-03-24T03:00:00+01:00")
self.assertEqual(prev.isoformat(), "2018-03-24T02:00:00+00:00")
- from perf_kit.sqlalchemy import trace_queries
+ from tests.utils.perf.perf_kit.sqlalchemy import trace_queries
@trace_queries
def test_bulk_sync_to_db(self):
diff --git a/scripts/perf/perf_kit/memory.py b/tests/utils/perf/perf_kit/memory.py
similarity index 98%
rename from scripts/perf/perf_kit/memory.py
rename to tests/utils/perf/perf_kit/memory.py
index ba3576e..f84c505 100644
--- a/scripts/perf/perf_kit/memory.py
+++ b/tests/utils/perf/perf_kit/memory.py
@@ -35,6 +35,7 @@ def _human_readable_size(size, decimal_places=3):
class TraceMemoryResult:
+ """Trace results of memory,"""
def __init__(self):
self.before = 0
self.after = 0
diff --git a/scripts/perf/perf_kit/python.py b/tests/utils/perf/perf_kit/python.py
similarity index 86%
rename from scripts/perf/perf_kit/python.py
rename to tests/utils/perf/perf_kit/python.py
index 36a0b11..3169e9c 100644
--- a/scripts/perf/perf_kit/python.py
+++ b/tests/utils/perf/perf_kit/python.py
@@ -45,7 +45,7 @@ def pyspy():
cap_add:
- SYS_PTRACE
- In the case of Airflow Breeze, you should modify the ``scripts/perf/perf_kit/python.py`` file.
+ In the case of Airflow Breeze, you should modify the ``tests/utils/perf/perf_kit/python.py`` file.
"""
pid = str(os.getpid())
suffix = datetime.datetime.now().isoformat()
@@ -66,24 +66,28 @@ def profiled(print_callers=False):
This decorator provide deterministic profiling. It uses ``cProfile`` internally. It generates statistic
and print on the screen.
"""
- pr = cProfile.Profile()
- pr.enable()
+ profile = cProfile.Profile()
+ profile.enable()
try:
yield
finally:
- pr.disable()
- s = io.StringIO()
- ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
+ profile.disable()
+ stat = io.StringIO()
+ pstatistics = pstats.Stats(profile, stream=stat).sort_stats("cumulative")
if print_callers:
- ps.print_callers()
+ pstatistics.print_callers()
else:
- ps.print_stats()
- print(s.getvalue())
+ pstatistics.print_stats()
+ print(stat.getvalue())
if __name__ == "__main__":
def case():
+ """
+ Load modules.
+ :return:
+ """
import logging
import airflow
diff --git a/scripts/perf/perf_kit/repeat_and_time.py b/tests/utils/perf/perf_kit/repeat_and_time.py
similarity index 88%
rename from scripts/perf/perf_kit/repeat_and_time.py
rename to tests/utils/perf/perf_kit/repeat_and_time.py
index 519f802..8efd7f1 100644
--- a/scripts/perf/perf_kit/repeat_and_time.py
+++ b/tests/utils/perf/perf_kit/repeat_and_time.py
@@ -23,6 +23,7 @@ import time
class TimingResult:
+ """Timing result."""
def __init__(self):
self.start_time = 0
self.end_time = 0
@@ -65,7 +66,7 @@ def repeat(repeat_count=5):
@functools.wraps(f)
def wrap(*args, **kwargs):
last_result = None
- for i in range(repeat_count):
+ for _ in range(repeat_count):
last_result = f(*args, **kwargs)
return last_result
@@ -75,7 +76,7 @@ def repeat(repeat_count=5):
class TimeoutException(Exception):
- pass
+ """Exception when the test timeo uts"""
@contextlib.contextmanager
@@ -109,13 +110,13 @@ def timeout(seconds=1):
if __name__ == "__main__":
def monte_carlo(total=10000):
- # Monte Carlo
+ """Monte Carlo"""
inside = 0
- for i in range(0, total):
- x2 = random.random() ** 2
- y2 = random.random() ** 2
- if math.sqrt(x2 + y2) < 1.0:
+ for _ in range(0, total):
+ x_val = random.random() ** 2
+ y_val = random.random() ** 2
+ if math.sqrt(x_val + y_val) < 1.0:
inside += 1
return (float(inside) / total) * 4
@@ -134,15 +135,16 @@ if __name__ == "__main__":
@timing(REPEAT_COUNT)
@repeat(REPEAT_COUNT)
@timing()
- def pi():
+ def get_pi():
+ """Returns PI value:"""
return monte_carlo()
- result = pi()
- print("PI: ", result)
+ res = get_pi()
+ print("PI: ", res)
print()
# Example 3:
with timing():
- result = monte_carlo()
+ res = monte_carlo()
- print("PI: ", result)
+ print("PI: ", res)
diff --git a/scripts/perf/perf_kit/sqlalchemy.py b/tests/utils/perf/perf_kit/sqlalchemy.py
similarity index 69%
rename from scripts/perf/perf_kit/sqlalchemy.py
rename to tests/utils/perf/perf_kit/sqlalchemy.py
index 72ab7a3..eb7f72b 100644
--- a/scripts/perf/perf_kit/sqlalchemy.py
+++ b/tests/utils/perf/perf_kit/sqlalchemy.py
@@ -32,6 +32,7 @@ def _pretty_format_sql(text: str):
return text
+# noinspection PyUnusedLocal
class TraceQueries:
"""
Tracking SQL queries in a code block.
@@ -61,11 +62,46 @@ class TraceQueries:
self.print_fn = print_fn
self.query_count = 0
- def before_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
+ def before_cursor_execute(self,
+ conn,
+ cursor, # pylint: disable=unused-argument
+ statement, # pylint: disable=unused-argument
+ parameters, # pylint: disable=unused-argument
+ context, # pylint: disable=unused-argument
+ executemany): # pylint: disable=unused-argument
+ """
+ Executed before cursor.
+
+ :param conn: connection
+ :param cursor: cursor
+ :param statement: statement
+ :param parameters: parameters
+ :param context: context
+ :param executemany: whether many statements executed
+ :return:
+ """
+
conn.info.setdefault("query_start_time", []).append(time.monotonic())
self.query_count += 1
- def after_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
+ def after_cursor_execute(self,
+ conn,
+ cursor, # pylint: disable=unused-argument
+ statement,
+ parameters,
+ context, # pylint: disable=unused-argument
+ executemany): # pylint: disable=unused-argument
+ """
+ Executed after cursor.
+
+ :param conn: connection
+ :param cursor: cursor
+ :param statement: statement
+ :param parameters: parameters
+ :param context: context
+ :param executemany: whether many statements executed
+ :return:
+ """
total = time.monotonic() - conn.info["query_start_time"].pop()
file_names = [
f"{f.filename}:{f.name}:{f.lineno}"
@@ -102,7 +138,8 @@ class TraceQueries:
event.listen(airflow.settings.engine, "before_cursor_execute", self.before_cursor_execute)
event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
- def __exit__(self, type_, value, traceback):
+ # noinspection PyShadowingNames
+ def __exit__(self, type_, value, traceback): # pylint: disable=redefined-outer-name
import airflow.settings
event.remove(airflow.settings.engine, "before_cursor_execute", self.before_cursor_execute)
event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
@@ -112,6 +149,9 @@ trace_queries = TraceQueries # pylint: disable=invalid-name
class CountQueriesResult:
+ """
+ Counter for number of queries.
+ """
def __init__(self):
self.count = 0
@@ -136,13 +176,30 @@ class CountQueries:
event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
return self.result
- def __exit__(self, type_, value, traceback):
+ # noinspection PyShadowingNames
+ def __exit__(self, type_, value, traceback): # pylint: disable=redefined-outer-name
import airflow.settings
event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
self.print_fn(f"Count SQL queries: {self.result.count}")
- def after_cursor_execute(self, *args, **kwargs):
+ def after_cursor_execute(self,
+ conn, # pylint: disable=unused-argument
+ cursor, # pylint: disable=unused-argument
+ statement, # pylint: disable=unused-argument
+ parameters, # pylint: disable=unused-argument
+ context, # pylint: disable=unused-argument
+ executemany): # pylint: disable=unused-argument
+ """
+ Executed after cursor.
+
+ :param conn: connection
+ :param cursor: cursor
+ :param statement: statement
+ :param parameters: parameters
+ :param context: context
+ :param executemany: whether many statements executed
+ """
self.result.count += 1
@@ -152,6 +209,7 @@ if __name__ == "__main__":
# Example:
def case():
+ "Case of logging om/"
import logging
from unittest import mock
diff --git a/scripts/perf/scheduler_dag_execution_timing.py b/tests/utils/perf/scheduler_dag_execution_timing.py
similarity index 97%
rename from scripts/perf/scheduler_dag_execution_timing.py
rename to tests/utils/perf/scheduler_dag_execution_timing.py
index ee177a2..6756366 100755
--- a/scripts/perf/scheduler_dag_execution_timing.py
+++ b/tests/utils/perf/scheduler_dag_execution_timing.py
@@ -101,15 +101,15 @@ def get_executor_under_test(dotted_path):
if dotted_path == "MockExecutor":
try:
# Run against master and 1.10.x releases
- from tests.test_utils.mock_executor import MockExecutor as Executor
+ from tests.test_utils.mock_executor import MockExecutor as executor
except ImportError:
- from tests.executors.test_executor import TestExecutor as Executor
+ from tests.executors.test_executor import TestExecutor as executor
else:
- Executor = ExecutorLoader.load_executor(dotted_path)
+ executor = ExecutorLoader.load_executor(dotted_path)
# Change this to try other executors
- class ShortCircuitExecutor(ShortCircuitExecutorMixin, Executor):
+ class ShortCircuitExecutor(ShortCircuitExecutorMixin, executor):
"""
Placeholder class that implements the inheritance hierarchy
"""
@@ -153,16 +153,16 @@ def create_dag_runs(dag, num_runs, session):
try:
from airflow.utils.types import DagRunType
- ID_PREFIX = f'{DagRunType.SCHEDULED.value}__'
+ id_prefix = f'{DagRunType.SCHEDULED.value}__'
except ImportError:
from airflow.models.dagrun import DagRun
- ID_PREFIX = DagRun.ID_PREFIX
+ id_prefix = DagRun.ID_PREFIX # pylint: disable=no-member
next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks))
for _ in range(num_runs):
dag.create_dagrun(
- run_id=ID_PREFIX + next_run_date.isoformat(),
+ run_id=id_prefix + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=timezone.utcnow(),
state=State.RUNNING,
diff --git a/scripts/perf/scheduler_ops_metrics.py b/tests/utils/perf/scheduler_ops_metrics.py
similarity index 100%
rename from scripts/perf/scheduler_ops_metrics.py
rename to tests/utils/perf/scheduler_ops_metrics.py
diff --git a/scripts/perf/sql_queries.py b/tests/utils/perf/sql_queries.py
similarity index 100%
rename from scripts/perf/sql_queries.py
rename to tests/utils/perf/sql_queries.py