You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/11/22 02:30:23 UTC

incubator-airflow git commit: [AIRFLOW-510] Filter Paused Dags, show Last Run & Trigger Dag

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 935ede22a -> 7c94d81c3


[AIRFLOW-510] Filter Paused Dags, show Last Run & Trigger Dag

Modify the HomeView to filter out paused dags if
wanted (and a config value to set
the default), display the last run datetime on
each dag, and allow externally triggering
the dag from an icon

Closes #1833 from
btallman/HomeView_Improvements_feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c94d81c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c94d81c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c94d81c

Branch: refs/heads/master
Commit: 7c94d81c390881643f94d5e3d7d6fb351a445b72
Parents: 935ede2
Author: Benjamin Tallman <bt...@gmail.com>
Authored: Mon Nov 21 18:28:04 2016 -0800
Committer: Siddharth Anand <si...@yahoo.com>
Committed: Mon Nov 21 18:28:11 2016 -0800

----------------------------------------------------------------------
 airflow/configuration.py                |  5 ++
 airflow/models.py                       | 13 ++++--
 airflow/www/templates/airflow/dags.html | 59 ++++++++++++++++++------
 airflow/www/views.py                    | 68 ++++++++++++++++++++++++++--
 4 files changed, 124 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c94d81c/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ad8b3e3..53d993c 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -255,6 +255,10 @@ demo_mode = False
 # while fetching logs from other worker machine
 log_fetch_timeout_sec = 5
 
+# By default, the webserver shows paused DAGs. Flip this to hide paused
+# DAGs by default
+hide_paused_dags_by_default = False
+
 [email]
 email_backend = airflow.utils.email.send_email_smtp
 
@@ -432,6 +436,7 @@ web_server_host = 0.0.0.0
 web_server_port = 8080
 dag_orientation = LR
 log_fetch_timeout_sec = 5
+hide_paused_dags_by_default = False
 
 [email]
 email_backend = airflow.utils.email.send_email_smtp

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c94d81c/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 0e30357..61ea359 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2682,17 +2682,22 @@ class DAG(BaseDag, LoggingMixin):
         return dttm
 
     @provide_session
-    def get_last_dagrun(self, session=None):
+    def get_last_dagrun(self, session=None, include_externally_triggered=False):
         """
         Returns the last dag run for this dag, None if there was none.
         Last dag run can be any type of run eg. scheduled or backfilled.
         Overriden DagRuns are ignored
         """
         DR = DagRun
-        last = session.query(DR).filter(
+        qry = session.query(DR).filter(
             DR.dag_id == self.dag_id,
-            DR.external_trigger == False
-        ).order_by(DR.execution_date.desc()).first()
+        )
+        if not include_externally_triggered:
+            qry = qry.filter(DR.external_trigger.is_(False))
+
+        qry = qry.order_by(DR.execution_date.desc())
+
+        last = qry.first()
 
         return last
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c94d81c/airflow/www/templates/airflow/dags.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index a5182e4..347015d 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -41,6 +41,8 @@
                   <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Status of tasks from all active DAG runs or, if not currently active, from most recent run."></span>
                   <img id="loading" width="15" src="{{ url_for("static", filename="loading.gif") }}">
                 </th>
+                <th style="padding-left: 5px;">Last Run <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Execution Date/Time of Highest Dag Run."></span>
+                </th>
                 <th style="padding-left: 5px;">DAG Runs
                   <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Status of all previous DAG runs."></span>
                   <img id="loading" width="15" src="{{ url_for("static", filename="loading.gif") }}">
@@ -102,59 +104,81 @@
                     <svg height="10" width="10" id='task-run-{{ dag.safe_dag_id }}' style="display: block;"></svg>
                 </td>
 
-                <!-- Column 7: Dag Runs -->
+                <!-- Column 7: Last Run -->
+                <td class="text-nowrap">
+                    {% if dag %}
+                        {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
+                        {% if last_run %}
+                            <a href="{{ url_for('airflow.graph', dag_id=last_run.dag_id, execution_date=last_run.execution_date ) }}">
+                                {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
+                            </a> <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Start Date: {{last_run.start_date.strftime('%Y-%m-%d %H:%M')}}"></span>
+                        {% else %}
+                            <!--No DAG Runs-->
+                        {% endif %}
+                    {% else %}
+                        <!--No DAG Runs-->
+                    {% endif %}
+                </td>
+
+                <!-- Column 8: Dag Runs -->
                 <td style="padding:0px; width:120px; height:10px;">
                     <svg height="10" width="10" id='dag-run-{{ dag.safe_dag_id }}' style="display: block;"></svg>
                 </td>
 
-                <!-- Column 8: Links -->
+                <!-- Column 9: Links -->
                 <td class="text-center" style="display:flex; flex-direction:row; justify-content:space-around;">
                 {% if dag %}
 
-                <!-- Graph -->
-                <a href="{{ url_for('airflow.graph', dag_id=dag.dag_id) }}" title="Graph View">
-                    <span class="glyphicon glyphicon-certificate" aria-hidden="true" title="Graph View"></span>
+                <!-- Trigger Dag -->
+                <a href="{{ url_for('airflow.trigger', dag_id=dag.dag_id) }}"
+                   onclick="return confirmTriggerDag('{{ dag.safe_dag_id }}')" title="Trigger Dag">
+                    <span class="glyphicon glyphicon-play-circle" aria-hidden="true"></span>
                 </a>
 
                 <!-- Tree -->
                 <a href="{{ url_for('airflow.tree', dag_id=dag.dag_id, num_runs=25) }}" title="Tree View">
-                    <span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true" title="Tree View"></span>
+                    <span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true"></span>
+                </a>
+
+                <!-- Graph -->
+                <a href="{{ url_for('airflow.graph', dag_id=dag.dag_id) }}" title="Graph View">
+                    <span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
                 </a>
 
                 <!-- Duration -->
                 <a href="{{ url_for('airflow.duration', dag_id=dag.dag_id) }}" title="Tasks Duration">
-                    <span class="glyphicon glyphicon-stats" aria-hidden="true" title="Tasks Duration"></span>
+                    <span class="glyphicon glyphicon-stats" aria-hidden="true"></span>
                 </a>
 
                 <!-- Retries -->
                 <a href="{{ url_for('airflow.tries', dag_id=dag.dag_id) }}" title="Task Tries">
-                    <span class="glyphicon glyphicon-duplicate" aria-hidden="true" title="Task Tries"></span>
+                    <span class="glyphicon glyphicon-duplicate" aria-hidden="true"></span>
                 </a>
 
                 <!-- Landing Times -->
                 <a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id) }}" title="Landing Times">
-                    <span class="glyphicon glyphicon-plane" aria-hidden="true" title="Landing Times"></span>
+                    <span class="glyphicon glyphicon-plane" aria-hidden="true"></span>
                 </a>
 
                 <!-- Gantt -->
                 <a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id) }}" title="Gantt View">
-                    <span class="glyphicon glyphicon-align-left" aria-hidden="true" title="Gantt View"></span>
+                    <span class="glyphicon glyphicon-align-left" aria-hidden="true"></span>
                 </a>
 
                 <!-- Code -->
                 <a href="{{ url_for("airflow.code", dag_id=dag.dag_id) }}" title="Code View">
-                    <span class="glyphicon glyphicon-flash" aria-hidden="true" title="Code View"></span>
+                    <span class="glyphicon glyphicon-flash" aria-hidden="true"></span>
                 </a>
 
                 <!-- Logs -->
                 <a href="/admin/log/?sort=1&amp;desc=1&amp;flt1_dag_id_equals={{ dag.dag_id }}" title="Logs">
-                    <span class="glyphicon glyphicon-align-justify" aria-hidden="true" title="Logs"></span>
+                    <span class="glyphicon glyphicon-align-justify" aria-hidden="true"></span>
                 </a>
                 {% endif %}
 
                 <!-- Refresh -->
                 <a href="{{ url_for("airflow.refresh", dag_id=dag_id) }}" title="Refresh">
-                  <span class="glyphicon glyphicon-refresh" aria-hidden="true" title="Refresh"></span>
+                  <span class="glyphicon glyphicon-refresh" aria-hidden="true"></span>
                 </a>
 
                 </td>
@@ -162,6 +186,11 @@
         {% endfor %}
         </tbody>
     </table>
+    {% if not hide_paused %}
+    <a href="/admin/?showPaused=False">Hide Paused DAGs</a>
+    {% else %}
+    <a href="/admin/?showPaused=True">Show Paused DAGs</a>
+    {% endif %}
   </div>
 {% endblock %}
 
@@ -171,6 +200,10 @@
   <script src="{{ url_for('static', filename='jquery.dataTables.min.js') }}"></script>
   <script src="{{ url_for('static', filename='bootstrap-toggle.min.js') }}"></script>
   <script>
+
+      function confirmTriggerDag(dag_id){
+          return confirm("Are you sure you want to run '"+dag_id+"' now?");
+      }
       all_dags = $("[id^=toggle]");
       $.each(all_dags, function(i,v) {
         $(v).change (function() {

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c94d81c/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c4f4bd7..d4c69dc 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -61,7 +61,7 @@ from airflow import models
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.settings import Session
-from airflow.models import XCom
+from airflow.models import XCom, DagRun
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
 
 from airflow.models import BaseOperator
@@ -979,6 +979,42 @@ class Airflow(BaseView):
             "it should start any moment now.".format(ti))
         return redirect(origin)
 
+    @expose('/trigger')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def trigger(self):
+        dag_id = request.args.get('dag_id')
+        origin = request.args.get('origin') or "/admin/"
+        dag = dagbag.get_dag(dag_id)
+
+        if not dag:
+            flash("Cannot find dag {}".format(dag_id))
+            return redirect(origin)
+
+        execution_date = datetime.now()
+        run_id = "manual__{0}".format(execution_date.isoformat())
+
+        dr = DagRun.find(dag_id=dag_id, run_id=run_id)
+        if dr:
+            flash("This run_id {} already exists".format(run_id))
+            return redirect(origin)
+
+        run_conf = {}
+
+        dag.create_dagrun(
+            run_id=run_id,
+            execution_date=execution_date,
+            state=State.RUNNING,
+            conf=run_conf,
+            external_trigger=True
+        )
+
+        flash(
+            "Triggered {}, "
+            "it should start any moment now.".format(dag_id))
+        return redirect(origin)
+
     @expose('/clear')
     @login_required
     @wwwutils.action_logging
@@ -1804,8 +1840,19 @@ class HomeView(AdminIndexView):
         do_filter = FILTER_BY_OWNER and (not current_user.is_superuser())
         owner_mode = conf.get('webserver', 'OWNER_MODE').strip().lower()
 
-        # read orm_dags from the db
+        hide_paused_dags_by_default = conf.getboolean('webserver',
+                                                      'hide_paused_dags_by_default')
+        show_paused_arg = request.args.get('showPaused', 'None')
+        if show_paused_arg.strip().lower() == 'false':
+            hide_paused = True
+
+        elif show_paused_arg.strip().lower() == 'true':
+            hide_paused = False
 
+        else:
+            hide_paused = hide_paused_dags_by_default
+
+        # read orm_dags from the db
         qry = session.query(DM)
         qry_fltr = []
 
@@ -1824,7 +1871,12 @@ class HomeView(AdminIndexView):
                 ~DM.is_subdag, DM.is_active
             ).all()
 
-        orm_dags = {dag.dag_id: dag for dag in qry_fltr}
+        # optionally filter out "paused" dags
+        if hide_paused:
+            orm_dags = {dag.dag_id: dag for dag in qry_fltr if not dag.is_paused}
+
+        else:
+            orm_dags = {dag.dag_id: dag for dag in qry_fltr}
 
         import_errors = session.query(models.ImportError).all()
         for ie in import_errors:
@@ -1836,7 +1888,14 @@ class HomeView(AdminIndexView):
         session.close()
 
         # get a list of all non-subdag dags visible to everyone
-        unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if not dag.parent_dag]
+        # optionally filter out "paused" dags
+        if hide_paused:
+            unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
+                                         not dag.parent_dag and not dag.is_paused]
+
+        else:
+            unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
+                                         not dag.parent_dag]
 
         # optionally filter to get only dags that the user should see
         if do_filter and owner_mode == 'ldapgroup':
@@ -1864,6 +1923,7 @@ class HomeView(AdminIndexView):
             'airflow/dags.html',
             webserver_dags=webserver_dags,
             orm_dags=orm_dags,
+            hide_paused=hide_paused,
             all_dag_ids=all_dag_ids)