You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/05 08:02:47 UTC
incubator-airflow git commit: [AIRFLOW-970] Load latest_runs on
homepage async
Repository: incubator-airflow
Updated Branches:
refs/heads/master 3ff5abee3 -> 0f7ddbbed
[AIRFLOW-970] Load latest_runs on homepage async
The latest_runs column on the homepage loads
synchronously with an n+1
query. Homepage loads will be significantly faster
if this happens
asynchronously and as a batch.
Closes #2144 from saguziel/aguziel-latest-run-
async
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0f7ddbbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0f7ddbbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0f7ddbbe
Branch: refs/heads/master
Commit: 0f7ddbbedb05f2f11500250db4989edcb27bc164
Parents: 3ff5abe
Author: Alex Guziel <al...@airbnb.com>
Authored: Wed Apr 5 10:02:42 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Apr 5 10:02:42 2017 +0200
----------------------------------------------------------------------
airflow/models.py | 23 ++++++++++++++++++++
airflow/www/api/experimental/endpoints.py | 23 +++++++++++++++++++-
airflow/www/templates/airflow/dags.html | 29 ++++++++++++++------------
tests/dags/test_latest_runs.py | 27 ++++++++++++++++++++++++
tests/models.py | 21 +++++++++++++++++--
5 files changed, 107 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8a91cc2..95e2255 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4218,6 +4218,29 @@ class DagRun(Base):
return False
+ @classmethod
+ @provide_session
+ def get_latest_runs(cls, session):
+ """Returns the latest running DagRun for each DAG. """
+ subquery = (
+ session
+ .query(
+ cls.dag_id,
+ func.max(cls.execution_date).label('execution_date'))
+ .filter(cls.state == State.RUNNING)
+ .group_by(cls.dag_id)
+ .subquery()
+ )
+ dagruns = (
+ session
+ .query(cls)
+ .join(subquery,
+ and_(cls.dag_id == subquery.c.dag_id,
+ cls.execution_date == subquery.c.execution_date))
+ .all()
+ )
+ return dagruns
+
class Pool(Base):
__tablename__ = "slot_pool"
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 56b9d79..63355c7 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -20,7 +20,8 @@ from airflow.exceptions import AirflowException
from airflow.www.app import csrf
from flask import (
- g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file
+ g, Markup, Blueprint, redirect, jsonify, abort,
+ request, current_app, send_file, url_for
)
from datetime import datetime
@@ -110,3 +111,23 @@ def task_info(dag_id, task_id):
task = dag.get_task(task_id)
fields = {k: str(v) for k, v in vars(task).items() if not k.startswith('_')}
return jsonify(fields)
+
+
+@api_experimental.route('/latest_runs', methods=['GET'])
+@requires_authentication
+def latest_dag_runs():
+ """Returns the latest running DagRun for each DAG formatted for the UI. """
+ from airflow.models import DagRun
+ dagruns = DagRun.get_latest_runs()
+ payload = []
+ for dagrun in dagruns:
+ if dagrun.execution_date:
+ payload.append({
+ 'dag_id': dagrun.dag_id,
+ 'execution_date': dagrun.execution_date.strftime("%Y-%m-%d %H:%M"),
+ 'start_date': ((dagrun.start_date or '') and
+ dagrun.start_date.strftime("%Y-%m-%d %H:%M")),
+ 'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id,
+ execution_date=dagrun.execution_date)
+ })
+ return jsonify(payload)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/www/templates/airflow/dags.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 8a5a346..5abbc4b 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -105,19 +105,7 @@
</td>
<!-- Column 7: Last Run -->
- <td class="text-nowrap">
- {% if dag %}
- {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
- {% if last_run and last_run.start_date %}
- <a href="{{ url_for('airflow.graph', dag_id=last_run.dag_id, execution_date=last_run.execution_date ) }}">
- {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
- </a> <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Start Date: {{last_run.start_date.strftime('%Y-%m-%d %H:%M')}}"></span>
- {% else %}
- <!--No DAG Runs-->
- {% endif %}
- {% else %}
- <!--No DAG Runs-->
- {% endif %}
+ <td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
</td>
<!-- Column 8: Dag Runs -->
@@ -237,6 +225,21 @@
}
});
});
+ $.getJSON("{{ url_for('api_experimental.latest_dag_runs') }}", function(data) {
+ $.each(data, function() {
+ var link = $("<a>", {
+ href: this.dag_run_url,
+ text: this.execution_date
+ });
+ var info_icon = $('<span>', {
+ "aria-hidden": "true",
+ id: "statuses_info",
+ title: "Start Date: " + this.start_date,
+ "class": "glyphicon glyphicon-info-sign"
+ });
+ $('.latest_dag_run.' + this.dag_id).append(link).append(info_icon);
+ });
+ });
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/tests/dags/test_latest_runs.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_latest_runs.py b/tests/dags/test_latest_runs.py
new file mode 100644
index 0000000..dd04c0e
--- /dev/null
+++ b/tests/dags/test_latest_runs.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+for i in range(1, 2):
+ dag = DAG(dag_id='test_latest_runs_{}'.format(i))
+ task = DummyOperator(
+ task_id='dummy_task',
+ dag=dag,
+ owner='airflow',
+ start_date=datetime(2016, 2, 1))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 15450dd..20da4d4 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -236,11 +236,13 @@ class DagTest(unittest.TestCase):
class DagRunTest(unittest.TestCase):
- def create_dag_run(self, dag, state=State.RUNNING, task_states=None):
+ def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None):
now = datetime.datetime.now()
+ if execution_date is None:
+ execution_date = now
dag_run = dag.create_dagrun(
run_id='manual__' + now.isoformat(),
- execution_date=now,
+ execution_date=execution_date,
start_date=now,
state=state,
external_trigger=False,
@@ -412,6 +414,21 @@ class DagRunTest(unittest.TestCase):
ti = dag_run.get_task_instance('test_short_circuit_false')
self.assertEqual(None, ti)
+ def test_get_latest_runs(self):
+ session = settings.Session()
+ dag = DAG(
+ dag_id='test_latest_runs_1',
+ start_date=DEFAULT_DATE)
+ dag_1_run_1 = self.create_dag_run(dag,
+ execution_date=datetime.datetime(2015, 1, 1))
+ dag_1_run_2 = self.create_dag_run(dag,
+ execution_date=datetime.datetime(2015, 1, 2))
+ dagruns = models.DagRun.get_latest_runs(session)
+ session.close()
+ for dagrun in dagruns:
+ if dagrun.dag_id == 'test_latest_runs_1':
+ self.assertEqual(dagrun.execution_date, datetime.datetime(2015, 1, 2))
+
class DagBagTest(unittest.TestCase):