You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/02/25 22:03:17 UTC

[GitHub] [airflow] houqp commented on a change in pull request #7537: [AIRFLOW-XXXX] And script to benchmark scheduler dag-run time

houqp commented on a change in pull request #7537: [AIRFLOW-XXXX] And script to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384154018
 
 

 ##########
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##########
 @@ -0,0 +1,220 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+    def __init__(self, stop_when_these_completed):
+        super().__init__()
+        self.reset(stop_when_these_completed)
+
+    def reset(self, stop_when_these_completed):
+        self.stop_when_these_completed = {
+            # Store the date as a timestamp, as sometimes this is a Pendulum
+            # object, others it is a datetime object.
+            (run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed
+        }
+
+    def change_state(self, key, state):
+        from airflow.utils.state import State
+        super().change_state(key, state)
+
+        dag_id, task_id, execution_date, __ = key
+        run_key = (dag_id, execution_date.timestamp())
+        run = self.stop_when_these_completed.get(run_key, None)
+        if run and all(t.state == State.SUCCESS for t in run.get_task_instances()):
+            self.stop_when_these_completed.pop(run_key)
+
+            if not self.stop_when_these_completed:
+                self.log.warning("STOPPING SCHEDULER -- all runs complete")
+                self.scheduler_job.processor_agent._done = True
+            else:
+                self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed))
+        elif state == State.SUCCESS:
+            self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+    try:
+        # Run against master and 1.10.x releases
+        from tests.test_utils.mock_executor import MockExecutor
+    except ImportError:
+        from tests.executors.test_executor import TestExecutor as MockExecutor
+
+    # from airflow.executors.local_executor import LocalExecutor
+
+    # Change this to try other executors
+    Executor = MockExecutor
+
+    class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+        pass
+
+    return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+    import airflow.models
+    from airflow.utils import timezone
+    from airflow.utils.state import State
+
+    DR = airflow.models.DagRun
+    DM = airflow.models.DagModel
+    TI = airflow.models.TaskInstance
+    TF = airflow.models.TaskFail
+    dag_id = dag.dag_id
+
+    session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+    session.query(DR).filter(DR.dag_id == dag_id).delete()
+    session.query(TI).filter(TI.dag_id == dag_id).delete()
+    session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+    next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks))
+
+    for _ in range(num_runs):
+        next_run = dag.create_dagrun(
+            run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            external_trigger=False,
+            session=session,
+        )
+        next_run_date = dag.following_schedule(next_run_date)
+    return next_run
+
+
+def pause_all_dags(session):
+    from airflow.models.dag import DagModel
+    session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+    """
+    This script will run the SchedulerJob for the specified dags "to completion".
+
+    That is it creates a fixed number of DAG runs for the specified DAGs (from
+    the configured dag path/example dags etc), disable the scheduler from
+    creating more, and then monitor them for completion. When the file task of
+    the final dag run is completed the scheduler will be terminated.
+
+    The aim of this script is to have a benchmark for real-world scheduler
+    performance -- i.e. total time take to run N dag runs to completion.
+
+    Care should be taken that other limits (DAG concurrency, pool size etc) are
+    not the bottleneck. This script doesn't help you in that regard.
+
+    It is recommended to repeat the test at least 3 times so that you can get
+    somewhat-accurate variance on the reported timing numbers.
 
 Review comment:
   is this something we should automate within this script as well?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services