You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/25 22:00:10 UTC

[5/6] incubator-airflow git commit: [AIRFLOW-807] Improve scheduler performance for large DAGs

[AIRFLOW-807] Improve scheduler performance for large DAGs

MySQL's query optimizer selects the wrong index, this
has a significant impact on the performance of the
scheduler.

Closes #2021 from criccomini/AIRFLOW-807


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b2a3ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b2a3ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b2a3ca2

Branch: refs/heads/v1-8-test
Commit: 6b2a3ca2ee4ee3415ef72ea1fa3fc694350e9efc
Parents: 5479ac8
Author: Chris Riccomini <ch...@wepay.com>
Authored: Wed Jan 25 22:54:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 22:54:18 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py                                 |  6 ++++
 ...7_add_dag_id_state_index_on_dag_run_table.py | 37 ++++++++++++++++++++
 2 files changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0ac3607..201d87f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -538,12 +538,18 @@ class SchedulerJob(BaseJob):
         Where assuming that the scheduler runs often, so we only check for
         tasks that should have succeeded in the past hour.
         """
+        if not any([ti.sla for ti in dag.tasks]):
+            self.logger.info("Skipping SLA check for {} because "
+              "no tasks in DAG have SLAs".format(dag))
+            return
+
         TI = models.TaskInstance
         sq = (
             session
             .query(
                 TI.task_id,
                 func.max(TI.execution_date).label('max_ti'))
+            .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
             .filter(TI.dag_id == dag.dag_id)
             .filter(TI.state == State.SUCCESS)
             .filter(TI.task_id.in_(dag.task_ids))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
new file mode 100644
index 0000000..c500966
--- /dev/null
+++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
@@ -0,0 +1,37 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add dag_id/state index on dag_run table
+
+Revision ID: 127d2bf2dfa7
+Revises: 1a5a9e6bf2b5
+Create Date: 2017-01-25 11:43:51.635667
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '127d2bf2dfa7'
+down_revision = '1a5a9e6bf2b5'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+def upgrade():
+    op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
+
+
+def downgrade():
+    op.drop_index('dag_id_state', table_name='dag_run')
+