You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/25 22:00:10 UTC
[5/6] incubator-airflow git commit: [AIRFLOW-807] Improve scheduler
performance for large DAGs
[AIRFLOW-807] Improve scheduler performance for large DAGs
MySQL's query optimizer selects the wrong index, this
has a significant impact on the performance of the
scheduler.
Closes #2021 from criccomini/AIRFLOW-807
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b2a3ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b2a3ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b2a3ca2
Branch: refs/heads/v1-8-test
Commit: 6b2a3ca2ee4ee3415ef72ea1fa3fc694350e9efc
Parents: 5479ac8
Author: Chris Riccomini <ch...@wepay.com>
Authored: Wed Jan 25 22:54:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 22:54:18 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 6 ++++
...7_add_dag_id_state_index_on_dag_run_table.py | 37 ++++++++++++++++++++
2 files changed, 43 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0ac3607..201d87f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -538,12 +538,18 @@ class SchedulerJob(BaseJob):
Where assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
+ if not any([ti.sla for ti in dag.tasks]):
+ self.logger.info("Skipping SLA check for {} because "
+ "no tasks in DAG have SLAs".format(dag))
+ return
+
TI = models.TaskInstance
sq = (
session
.query(
TI.task_id,
func.max(TI.execution_date).label('max_ti'))
+ .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
.filter(TI.dag_id == dag.dag_id)
.filter(TI.state == State.SUCCESS)
.filter(TI.task_id.in_(dag.task_ids))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
new file mode 100644
index 0000000..c500966
--- /dev/null
+++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
@@ -0,0 +1,37 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add dag_id/state index on dag_run table
+
+Revision ID: 127d2bf2dfa7
+Revises: 1a5a9e6bf2b5
+Create Date: 2017-01-25 11:43:51.635667
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '127d2bf2dfa7'
+down_revision = '1a5a9e6bf2b5'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+def upgrade():
+ op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
+
+
+def downgrade():
+ op.drop_index('dag_id_state', table_name='dag_run')
+