You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/05 08:02:47 UTC

incubator-airflow git commit: [AIRFLOW-970] Load latest_runs on homepage async

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3ff5abee3 -> 0f7ddbbed


[AIRFLOW-970] Load latest_runs on homepage async

The latest_runs column on the homepage loads
synchronously with an n+1
query. Homepage loads will be significantly faster
if this happens
asynchronously and as a batch.

Closes #2144 from saguziel/aguziel-latest-run-
async


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0f7ddbbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0f7ddbbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0f7ddbbe

Branch: refs/heads/master
Commit: 0f7ddbbedb05f2f11500250db4989edcb27bc164
Parents: 3ff5abe
Author: Alex Guziel <al...@airbnb.com>
Authored: Wed Apr 5 10:02:42 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Apr 5 10:02:42 2017 +0200

----------------------------------------------------------------------
 airflow/models.py                         | 23 ++++++++++++++++++++
 airflow/www/api/experimental/endpoints.py | 23 +++++++++++++++++++-
 airflow/www/templates/airflow/dags.html   | 29 ++++++++++++++------------
 tests/dags/test_latest_runs.py            | 27 ++++++++++++++++++++++++
 tests/models.py                           | 21 +++++++++++++++++--
 5 files changed, 107 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 8a91cc2..95e2255 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4218,6 +4218,29 @@ class DagRun(Base):
 
         return False
 
+    @classmethod
+    @provide_session
+    def get_latest_runs(cls, session):
+        """Returns the latest running DagRun for each DAG. """
+        subquery = (
+            session
+            .query(
+                cls.dag_id,
+                func.max(cls.execution_date).label('execution_date'))
+            .filter(cls.state == State.RUNNING)
+            .group_by(cls.dag_id)
+            .subquery()
+        )
+        dagruns = (
+            session
+            .query(cls)
+            .join(subquery,
+                  and_(cls.dag_id == subquery.c.dag_id,
+                       cls.execution_date == subquery.c.execution_date))
+            .all()
+        )
+        return dagruns
+
 
 class Pool(Base):
     __tablename__ = "slot_pool"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 56b9d79..63355c7 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -20,7 +20,8 @@ from airflow.exceptions import AirflowException
 from airflow.www.app import csrf
 
 from flask import (
-    g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file
+    g, Markup, Blueprint, redirect, jsonify, abort,
+    request, current_app, send_file, url_for
 )
 from datetime import datetime
 
@@ -110,3 +111,23 @@ def task_info(dag_id, task_id):
     task = dag.get_task(task_id)
     fields = {k: str(v) for k, v in vars(task).items() if not k.startswith('_')}
     return jsonify(fields)
+
+
+@api_experimental.route('/latest_runs', methods=['GET'])
+@requires_authentication
+def latest_dag_runs():
+    """Returns the latest running DagRun for each DAG formatted for the UI. """
+    from airflow.models import DagRun
+    dagruns = DagRun.get_latest_runs()
+    payload = []
+    for dagrun in dagruns:
+        if dagrun.execution_date:
+            payload.append({
+                'dag_id': dagrun.dag_id,
+                'execution_date': dagrun.execution_date.strftime("%Y-%m-%d %H:%M"),
+                'start_date': ((dagrun.start_date or '') and
+                               dagrun.start_date.strftime("%Y-%m-%d %H:%M")),
+                'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id,
+                                       execution_date=dagrun.execution_date)
+            })
+    return jsonify(payload)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/www/templates/airflow/dags.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 8a5a346..5abbc4b 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -105,19 +105,7 @@
                 </td>
 
                 <!-- Column 7: Last Run -->
-                <td class="text-nowrap">
-                    {% if dag %}
-                        {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
-                        {% if last_run and last_run.start_date %}
-                            <a href="{{ url_for('airflow.graph', dag_id=last_run.dag_id, execution_date=last_run.execution_date ) }}">
-                                {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
-                            </a> <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Start Date: {{last_run.start_date.strftime('%Y-%m-%d %H:%M')}}"></span>
-                        {% else %}
-                            <!--No DAG Runs-->
-                        {% endif %}
-                    {% else %}
-                        <!--No DAG Runs-->
-                    {% endif %}
+                <td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
                 </td>
 
                 <!-- Column 8: Dag Runs -->
@@ -237,6 +225,21 @@
           }
         });
       });
+      $.getJSON("{{ url_for('api_experimental.latest_dag_runs') }}", function(data) {
+        $.each(data, function() {
+          var link = $("<a>", {
+            href: this.dag_run_url,
+            text: this.execution_date
+          });
+          var info_icon = $('<span>', {
+            "aria-hidden": "true",
+            id: "statuses_info",
+            title: "Start Date: " + this.start_date,
+            "class": "glyphicon glyphicon-info-sign"
+          });
+          $('.latest_dag_run.' + this.dag_id).append(link).append(info_icon);
+        });
+      });
       d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
         for(var dag_id in json) {
             states = json[dag_id];

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/tests/dags/test_latest_runs.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_latest_runs.py b/tests/dags/test_latest_runs.py
new file mode 100644
index 0000000..dd04c0e
--- /dev/null
+++ b/tests/dags/test_latest_runs.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+for i in range(1, 2):
+    dag = DAG(dag_id='test_latest_runs_{}'.format(i))
+    task = DummyOperator(
+        task_id='dummy_task',
+        dag=dag,
+        owner='airflow',
+        start_date=datetime(2016, 2, 1))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 15450dd..20da4d4 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -236,11 +236,13 @@ class DagTest(unittest.TestCase):
 
 class DagRunTest(unittest.TestCase):
 
-    def create_dag_run(self, dag, state=State.RUNNING, task_states=None):
+    def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None):
         now = datetime.datetime.now()
+        if execution_date is None:
+            execution_date = now
         dag_run = dag.create_dagrun(
             run_id='manual__' + now.isoformat(),
-            execution_date=now,
+            execution_date=execution_date,
             start_date=now,
             state=state,
             external_trigger=False,
@@ -412,6 +414,21 @@ class DagRunTest(unittest.TestCase):
         ti = dag_run.get_task_instance('test_short_circuit_false')
         self.assertEqual(None, ti)
 
+    def test_get_latest_runs(self):
+        session = settings.Session()
+        dag = DAG(
+            dag_id='test_latest_runs_1',
+            start_date=DEFAULT_DATE)
+        dag_1_run_1 = self.create_dag_run(dag, 
+                execution_date=datetime.datetime(2015, 1, 1))
+        dag_1_run_2 = self.create_dag_run(dag,
+                execution_date=datetime.datetime(2015, 1, 2))
+        dagruns = models.DagRun.get_latest_runs(session)
+        session.close()
+        for dagrun in dagruns:
+            if dagrun.dag_id == 'test_latest_runs_1':
+                self.assertEqual(dagrun.execution_date, datetime.datetime(2015, 1, 2))
+
 
 class DagBagTest(unittest.TestCase):