You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/11 10:18:02 UTC

[GitHub] Fokko closed pull request #4390: [AIRFLOW-3584] Use ORM DAGs for index view.

Fokko closed pull request #4390: [AIRFLOW-3584] Use ORM DAGs for index view.
URL: https://github.com/apache/airflow/pull/4390
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/migrations/versions/dd4ecb8fbee3_add_schedule_interval_to_dag.py b/airflow/migrations/versions/dd4ecb8fbee3_add_schedule_interval_to_dag.py
new file mode 100644
index 0000000000..3b2e6d577a
--- /dev/null
+++ b/airflow/migrations/versions/dd4ecb8fbee3_add_schedule_interval_to_dag.py
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add schedule interval to dag
+
+Revision ID: dd4ecb8fbee3
+Revises: c8ffec048a3b
+Create Date: 2018-12-27 18:39:25.748032
+
+"""
+
+from alembic import op
+import sqlalchemy as sa
+
+# revision identifiers, used by Alembic.
+revision = 'dd4ecb8fbee3'
+down_revision = 'c8ffec048a3b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    op.add_column('dag', sa.Column('schedule_interval', sa.Text(), nullable=True))
+
+
+def downgrade():
+    op.drop_column('dag', sa.Column('schedule_interval', sa.Text(), nullable=True))
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index b2561e2fdb..6d1adf72ff 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -101,7 +101,7 @@
     as_tuple, is_container, validate_key, pprinttable)
 from airflow.utils.operator_resources import Resources
 from airflow.utils.state import State
-from airflow.utils.sqlalchemy import UtcDateTime
+from airflow.utils.sqlalchemy import UtcDateTime, Interval
 from airflow.utils.timeout import timeout
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.weight_rule import WeightRule
@@ -240,6 +240,20 @@ def clear_task_instances(tis,
             dr.start_date = timezone.utcnow()
 
 
+def get_last_dagrun(dag_id, session, include_externally_triggered=False):
+    """
+    Returns the last dag run for a dag, None if there was none.
+    Last dag run can be any type of run eg. scheduled or backfilled.
+    Overridden DagRuns are ignored.
+    """
+    DR = DagRun
+    query = session.query(DR).filter(DR.dag_id == dag_id)
+    if not include_externally_triggered:
+        query = query.filter(DR.external_trigger == False)  # noqa
+    query = query.order_by(DR.execution_date.desc())
+    return query.first()
+
+
 class DagBag(BaseDagBag, LoggingMixin):
     """
     A dagbag is a collection of dags, parsed out of a folder tree and has high
@@ -2980,6 +2994,8 @@ class DagModel(Base):
     description = Column(Text)
     # Default view of the inside the webserver
     default_view = Column(String(25))
+    # Schedule interval
+    schedule_interval = Column(Interval)
 
     def __repr__(self):
         return "<DAG: {self.dag_id}>".format(self=self)
@@ -2999,6 +3015,15 @@ def get_default_view(self):
         else:
             return self.default_view
 
+    @provide_session
+    def get_last_dagrun(self, session=None, include_externally_triggered=False):
+        return get_last_dagrun(self.dag_id, session=session,
+                               include_externally_triggered=include_externally_triggered)
+
+    @property
+    def safe_dag_id(self):
+        return self.dag_id.replace('.', '__dot__')
+
     def get_dag(self):
         return DagBag(dag_folder=self.fileloc).get_dag(self.dag_id)
 
@@ -3413,23 +3438,8 @@ def normalize_schedule(self, dttm):
 
     @provide_session
     def get_last_dagrun(self, session=None, include_externally_triggered=False):
-        """
-        Returns the last dag run for this dag, None if there was none.
-        Last dag run can be any type of run eg. scheduled or backfilled.
-        Overridden DagRuns are ignored
-        """
-        DR = DagRun
-        qry = session.query(DR).filter(
-            DR.dag_id == self.dag_id,
-        )
-        if not include_externally_triggered:
-            qry = qry.filter(DR.external_trigger.__eq__(False))
-
-        qry = qry.order_by(DR.execution_date.desc())
-
-        last = qry.first()
-
-        return last
+        return get_last_dagrun(self.dag_id, session=session,
+                               include_externally_triggered=include_externally_triggered)
 
     @property
     def dag_id(self):
@@ -4236,6 +4246,7 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
         orm_dag.last_scheduler_run = sync_time
         orm_dag.default_view = self._default_view
         orm_dag.description = self.description
+        orm_dag.schedule_interval = self.schedule_interval
         session.merge(orm_dag)
         session.commit()
 
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 7e97371319..e869692b82 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -24,12 +24,14 @@
 
 import datetime
 import os
+import json
 import pendulum
 import time
 import random
 
+from dateutil import relativedelta
 from sqlalchemy import event, exc, select
-from sqlalchemy.types import DateTime, TypeDecorator
+from sqlalchemy.types import Text, DateTime, TypeDecorator
 
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -169,3 +171,34 @@ def process_result_value(self, value, dialect):
                 value = value.astimezone(utc)
 
         return value
+
+
+class Interval(TypeDecorator):
+
+    impl = Text
+
+    attr_keys = {
+        datetime.timedelta: ('days', 'seconds', 'microseconds'),
+        relativedelta.relativedelta: (
+            'years', 'months', 'days', 'leapdays', 'hours', 'minutes', 'seconds', 'microseconds',
+            'year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond',
+        ),
+    }
+
+    def process_bind_param(self, value, dialect):
+        if type(value) in self.attr_keys:
+            attrs = {
+                key: getattr(value, key)
+                for key in self.attr_keys[type(value)]
+            }
+            return json.dumps({'type': type(value).__name__, 'attrs': attrs})
+        return json.dumps(value)
+
+    def process_result_value(self, value, dialect):
+        if not value:
+            return value
+        data = json.loads(value)
+        if isinstance(data, dict):
+            type_map = {key.__name__: key for key in self.attr_keys}
+            return type_map[data['type']](**data['attrs'])
+        return data
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 5b3c0b7919..5e45aa5735 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -63,52 +63,37 @@ <h2>DAGs</h2>
             </tr>
         </thead>
         <tbody>
-        {% for dag_id in dag_ids_in_page %}
-            {% set dag = webserver_dags[dag_id] if dag_id in webserver_dags else None %}
+        {% for dag in dags %}
             <tr>
                 <!-- Column 1: Edit dag -->
                 <td class="text-center" style="width:10px;">
-                    {% if dag_id in orm_dags %}
-                  <a href="{{ url_for('dagmodel.edit_view') }}?id={{ dag_id }}" title="Info">
+                  <a href="{{ url_for('dagmodel.edit_view') }}?id={{ dag.dag_id }}" title="Info">
                         <span class="glyphicon glyphicon-edit" aria-hidden="true"></span>
                     </a>
-                    {% endif %}
                 </td>
 
                 <!-- Column 2: Turn dag on/off -->
                 <td>
-                  {% if dag_id in orm_dags %}
-                    <input id="toggle-{{ dag_id }}" dag_id="{{ dag_id }}" type="checkbox" {{ "checked" if not orm_dags[dag_id].is_paused else "" }} data-toggle="toggle" data-size="mini" method="post">
-                  {% endif %}
+                  <input id="toggle-{{ dag.dag_id }}" dag_id="{{ dag.dag_id }}" type="checkbox" {{ "checked" if not dag.is_paused else "" }} data-toggle="toggle" data-size="mini" method="post">
                 </td>
 
                 <!-- Column 3: Name -->
                 <td>
-                    {% if dag_id in webserver_dags %}
-                    <a href="{{ url_for('airflow.'+ dag.get_default_view(), dag_id=dag.dag_id) }}" title="{{ dag.description }}">
-                        {{ dag_id }}
-                    </a>
-                    {% else %}
-                        {{ dag_id }}
-                        <span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG isn't available in the web server's DagBag object. It shows up in this list because the scheduler marked it as active in the metadata database."></span>
-                    {% endif %}
-                    {% if dag_id not in orm_dags %}
-                        <span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence."></span>
-                    {% endif %}
+                  <a href="{{ url_for('airflow.'+ dag.get_default_view(), dag_id=dag.dag_id) }}" title="{{ dag.description }}">
+                      {{ dag.dag_id }}
+                  </a>
                 </td>
 
                 <!-- Column 4: Dag Schedule -->
                 <td>
-                    {% if dag_id in webserver_dags %}
                   <a class="label label-default schedule {{ dag.dag_id }}" href="{{ url_for('dagrun.index_view') }}?flt2_dag_id_equals={{ dag.dag_id }}">
-                        {{ dag.schedule_interval }}
+                        {{ dag.schedule_interval | string }}
                     </a>
-                    {% endif %}
                 </td>
 
                 <!-- Column 5: Dag Owners -->
                 <td>
-                  {{ dag.owner if dag else orm_dags[dag_id].owners }}
+                  {{ dag.owners }}
                 </td>
 
                 <!-- Column 6: Recent Tasks -->
@@ -118,14 +103,12 @@ <h2>DAGs</h2>
 
                 <!-- Column 7: Last Run -->
                 <td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
-                  {% if dag %}
-                    {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
-                    {% if last_run and last_run.execution_date %}
-                      <a href="{{ url_for('airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
-                        {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
-                      </a>
-                      <span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
-                    {% endif %}
+                  {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
+                  {% if last_run and last_run.execution_date %}
+                    <a href="{{ url_for('airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
+                      {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
+                    </a>
+                    <span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
                   {% endif %}
                 </td>
 
@@ -186,14 +169,14 @@ <h2>DAGs</h2>
                 {% endif %}
 
                 <!-- Refresh -->
-                <a href="{{ url_for("airflow.refresh", dag_id=dag_id) }}">
+                <a href="{{ url_for("airflow.refresh", dag_id=dag.dag_id) }}">
                   <span class="glyphicon glyphicon-refresh" aria-hidden="true" data-original-title="Refresh"></span>
                 </a>
 
                 <!-- Delete -->
                 <!-- Use dag_id instead of dag.dag_id, because the DAG might not exist in the webserver's DagBag -->
-                <a href="{{ url_for('airflow.delete', dag_id=dag_id) }}"
-                  onclick="return confirmDeleteDag('{{ dag_id }}')">
+                <a href="{{ url_for('airflow.delete', dag_id=dag.dag_id) }}"
+                  onclick="return confirmDeleteDag('{{ dag.dag_id }}')">
                    <span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true" data-original-title="Delete Dag"></span>
                 </a>
                 </td>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d7270d03c8..8d6618ccf6 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2138,31 +2138,37 @@ def get_int_arg(value, default=0):
             hide_paused = hide_paused_dags_by_default
 
         # read orm_dags from the db
-        sql_query = session.query(DM)
+        query = session.query(DM)
 
         if do_filter and owner_mode == 'ldapgroup':
-            sql_query = sql_query.filter(
+            query = query.filter(
                 ~DM.is_subdag,
                 DM.is_active,
                 DM.owners.in_(current_user.ldap_groups)
             )
         elif do_filter and owner_mode == 'user':
-            sql_query = sql_query.filter(
+            query = query.filter(
                 ~DM.is_subdag, DM.is_active,
                 DM.owners == current_user.user.username
             )
         else:
-            sql_query = sql_query.filter(
+            query = query.filter(
                 ~DM.is_subdag, DM.is_active
             )
 
         # optionally filter out "paused" dags
         if hide_paused:
-            sql_query = sql_query.filter(~DM.is_paused)
+            query = query.filter(~DM.is_paused)
 
-        orm_dags = {dag.dag_id: dag for dag
-                    in sql_query
-                    .all()}
+        if arg_search_query:
+            query = query.filter(sqla.func.lower(DM.dag_id) == arg_search_query.lower())
+
+        query = query.order_by(DM.dag_id)
+
+        start = current_page * dags_per_page
+        end = start + dags_per_page
+
+        dags = query.offset(start).limit(dags_per_page).all()
 
         import_errors = session.query(models.ImportError).all()
         for ie in import_errors:
@@ -2178,76 +2184,17 @@ def get_int_arg(value, default=0):
                     filename=filename),
                 "error")
 
-        # get a list of all non-subdag dags visible to everyone
-        # optionally filter out "paused" dags
-        if hide_paused:
-            unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
-                                         not dag.parent_dag and not dag.is_paused]
-
-        else:
-            unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
-                                         not dag.parent_dag]
-
-        # optionally filter to get only dags that the user should see
-        if do_filter and owner_mode == 'ldapgroup':
-            # only show dags owned by someone in @current_user.ldap_groups
-            webserver_dags = {
-                dag.dag_id: dag
-                for dag in unfiltered_webserver_dags
-                if dag.owner in current_user.ldap_groups
-            }
-        elif do_filter and owner_mode == 'user':
-            # only show dags owned by @current_user.user.username
-            webserver_dags = {
-                dag.dag_id: dag
-                for dag in unfiltered_webserver_dags
-                if dag.owner == current_user.user.username
-            }
-        else:
-            webserver_dags = {
-                dag.dag_id: dag
-                for dag in unfiltered_webserver_dags
-            }
-
-        if arg_search_query:
-            lower_search_query = arg_search_query.lower()
-            # filter by dag_id
-            webserver_dags_filtered = {
-                dag_id: dag
-                for dag_id, dag in webserver_dags.items()
-                if (lower_search_query in dag_id.lower() or
-                    lower_search_query in dag.owner.lower())
-            }
-
-            all_dag_ids = (set([dag.dag_id for dag in orm_dags.values()
-                                if lower_search_query in dag.dag_id.lower() or
-                                lower_search_query in dag.owners.lower()]) |
-                           set(webserver_dags_filtered.keys()))
-
-            sorted_dag_ids = sorted(all_dag_ids)
-        else:
-            webserver_dags_filtered = webserver_dags
-            sorted_dag_ids = sorted(set(orm_dags.keys()) | set(webserver_dags.keys()))
-
-        start = current_page * dags_per_page
-        end = start + dags_per_page
-
-        num_of_all_dags = len(sorted_dag_ids)
-        page_dag_ids = sorted_dag_ids[start:end]
+        num_of_all_dags = query.count()
         num_of_pages = int(math.ceil(num_of_all_dags / float(dags_per_page)))
 
         auto_complete_data = set()
-        for dag in webserver_dags_filtered.values():
-            auto_complete_data.add(dag.dag_id)
-            auto_complete_data.add(dag.owner)
-        for dag in orm_dags.values():
-            auto_complete_data.add(dag.dag_id)
-            auto_complete_data.add(dag.owners)
+        for row in query.with_entities(DM.dag_id, DM.owners):
+            auto_complete_data.add(row.dag_id)
+            auto_complete_data.add(row.owners)
 
         return self.render(
             'airflow/dags.html',
-            webserver_dags=webserver_dags_filtered,
-            orm_dags=orm_dags,
+            dags=dags,
             hide_paused=hide_paused,
             current_page=current_page,
             search_query=arg_search_query if arg_search_query else '',
@@ -2259,7 +2206,6 @@ def get_int_arg(value, default=0):
             paging=wwwutils.generate_pages(current_page, num_of_pages,
                                            search=arg_search_query,
                                            showPaused=not hide_paused),
-            dag_ids_in_page=page_dag_ids,
             auto_complete_data=auto_complete_data)
 
 
diff --git a/airflow/www_rbac/package-lock.json b/airflow/www_rbac/package-lock.json
index b595b467e5..d47f0839ac 100644
--- a/airflow/www_rbac/package-lock.json
+++ b/airflow/www_rbac/package-lock.json
@@ -2249,23 +2249,23 @@
       }
     },
     "dagre": {
-      "version": "0.8.2",
-      "resolved": "https://registry.npmjs.org/dagre/-/dagre-0.8.2.tgz",
-      "integrity": "sha512-TEOOGZOkCOgCG7AoUIq64sJ3d21SMv8tyoqteLpX+UsUsS9Qw8iap4hhogXY4oB3r0bbZuAjO0atAilgCmsE0Q==",
+      "version": "0.8.4",
+      "resolved": "https://registry.npmjs.org/dagre/-/dagre-0.8.4.tgz",
+      "integrity": "sha512-Dj0csFDrWYKdavwROb9FccHfTC4fJbyF/oJdL9LNZJ8WUvl968P6PAKEriGqfbdArVJEmmfA+UyumgWEwcHU6A==",
       "requires": {
-        "graphlib": "^2.1.5",
+        "graphlib": "^2.1.7",
         "lodash": "^4.17.4"
       }
     },
     "dagre-d3": {
-      "version": "0.6.1",
-      "resolved": "https://registry.npmjs.org/dagre-d3/-/dagre-d3-0.6.1.tgz",
-      "integrity": "sha512-KvF8GIxcsTBgB+Pgu6aqdkRGhZwyxMs1MUe2C5Mr8gFnQZtyeXfl08fqcS/EF5i9AAPWSKtnxJHXgMYS4WqB4w==",
+      "version": "0.6.3",
+      "resolved": "https://registry.npmjs.org/dagre-d3/-/dagre-d3-0.6.3.tgz",
+      "integrity": "sha512-1vAzNp7OR1370JtjNaVFiW04DBjPDq513cJnqNVWxIkZqB0HfIArsc5eriTY9RM9cVMUjxdCJ3z4of5f8HqbdA==",
       "requires": {
         "d3": "^4.12.2",
-        "dagre": "^0.8.1",
-        "graphlib": "^2.1.5",
-        "lodash": "^4.17.4"
+        "dagre": "^0.8.4",
+        "graphlib": "^2.1.7",
+        "lodash": "^4.17.10"
       },
       "dependencies": {
         "d3": {
@@ -4185,11 +4185,11 @@
       "dev": true
     },
     "graphlib": {
-      "version": "2.1.5",
-      "resolved": "https://registry.npmjs.org/graphlib/-/graphlib-2.1.5.tgz",
-      "integrity": "sha512-XvtbqCcw+EM5SqQrIetIKKD+uZVNQtDPD1goIg7K73RuRZtVI5rYMdcCVSHm/AS1sCBZ7vt0p5WgXouucHQaOA==",
+      "version": "2.1.7",
+      "resolved": "https://registry.npmjs.org/graphlib/-/graphlib-2.1.7.tgz",
+      "integrity": "sha512-TyI9jIy2J4j0qgPmOOrHTCtpPqJGN/aurBwc6ZT+bRii+di1I+Wv3obRhVrmBEXet+qkMaEX67dXrwsd3QQM6w==",
       "requires": {
-        "lodash": "^4.11.1"
+        "lodash": "^4.17.5"
       }
     },
     "has": {
diff --git a/airflow/www_rbac/package.json b/airflow/www_rbac/package.json
index 0e4d021447..6241e7a415 100644
--- a/airflow/www_rbac/package.json
+++ b/airflow/www_rbac/package.json
@@ -56,7 +56,7 @@
     "bootstrap-toggle": "^2.2.2",
     "d3": "^3.4.4",
     "d3-tip": "^0.9.1",
-    "dagre-d3": "^0.6.1",
+    "dagre-d3": "^0.6.3",
     "datatables.net": "^1.10.19",
     "datatables.net-bs": "^1.10.19",
     "moment-timezone": "^0.5.21",
diff --git a/airflow/www_rbac/templates/airflow/dags.html b/airflow/www_rbac/templates/airflow/dags.html
index 4e3aa8df2f..5663a16831 100644
--- a/airflow/www_rbac/templates/airflow/dags.html
+++ b/airflow/www_rbac/templates/airflow/dags.html
@@ -63,52 +63,37 @@ <h2>DAGs</h2>
             </tr>
         </thead>
         <tbody>
-        {% for dag_id in dag_ids_in_page %}
-            {% set dag = webserver_dags[dag_id] if dag_id in webserver_dags else None %}
+        {% for dag in dags %}
             <tr>
                 <!-- Column 1: Edit dag -->
                 <td class="text-center" style="width:10px;">
-                    {% if dag_id in orm_dags %}
-                    <a href="{{ url_for('DagModelView.show', pk=dag_id) }}" title="Info">
+                    <a href="{{ url_for('DagModelView.show', pk=dag.dag_id) }}" title="Info">
                         <span class="glyphicon glyphicon-edit" aria-hidden="true"></span>
                     </a>
-                    {% endif %}
                 </td>
 
                 <!-- Column 2: Turn dag on/off -->
                 <td>
-                  {% if dag_id in orm_dags %}
-                    <input id="toggle-{{ dag_id }}" dag_id="{{ dag_id }}" type="checkbox" {{ "checked" if not orm_dags[dag_id].is_paused else "" }} data-toggle="toggle" data-size="mini" method="post">
-                  {% endif %}
+                    <input id="toggle-{{ dag.dag_id }}" dag_id="{{ dag.dag_id }}" type="checkbox" {{ "checked" if not dag.is_paused else "" }} data-toggle="toggle" data-size="mini" method="post">
                 </td>
 
                 <!-- Column 3: Name -->
                 <td>
-                    {% if dag_id in webserver_dags %}
                     <a href="{{ url_for('Airflow.'+ dag.get_default_view(), dag_id=dag.dag_id) }}" title="{{ dag.description }}">
-                        {{ dag_id }}
+                        {{ dag.dag_id }}
                     </a>
-                    {% else %}
-                        {{ dag_id }}
-                        <span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG isn't available in the web server's DagBag object. It shows up in this list because the scheduler marked it as active in the metadata database."></span>
-                    {% endif %}
-                    {% if dag_id not in orm_dags %}
-                        <span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence."></span>
-                    {% endif %}
                 </td>
 
                 <!-- Column 4: Dag Schedule -->
                 <td>
-                    {% if dag_id in webserver_dags %}
                     <a class="label label-default schedule {{ dag.dag_id }}" href="{{ url_for('DagRunModelView.list') }}?_flt_3_dag_id={{ dag.dag_id }}">
                         {{ dag.schedule_interval }}
                     </a>
-                    {% endif %}
                 </td>
 
                 <!-- Column 5: Dag Owners -->
                 <td>
-                  {{ dag.owner if dag else orm_dags[dag_id].owners }}
+                  {{ dag.owners }}
                 </td>
 
                 <!-- Column 6: Recent Tasks -->
@@ -118,14 +103,12 @@ <h2>DAGs</h2>
 
                 <!-- Column 7: Last Run -->
                 <td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
-                  {% if dag %}
-                    {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
-                    {% if last_run and last_run.execution_date %}
-                      <a href="{{ url_for('Airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
-                        {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
-                      </a>
-                      <span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
-                    {% endif %}
+                  {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
+                  {% if last_run and last_run.execution_date %}
+                    <a href="{{ url_for('Airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
+                      {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
+                    </a>
+                    <span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
                   {% endif %}
                 </td>
 
@@ -186,14 +169,14 @@ <h2>DAGs</h2>
                 {% endif %}
 
                 <!-- Refresh -->
-                <a href="{{ url_for("Airflow.refresh", dag_id=dag_id) }}">
+                <a href="{{ url_for("Airflow.refresh", dag_id=dag.dag_id) }}">
                   <span class="glyphicon glyphicon-refresh" aria-hidden="true" data-original-title="Refresh"></span>
                 </a>
 
                 <!-- Delete -->
                 <!-- Use dag_id instead of dag.dag_id, because the DAG might not exist in the webserver's DagBag -->
-                <a href="{{ url_for('Airflow.delete', dag_id=dag_id) }}"
-                  onclick="return confirmDeleteDag('{{ dag_id }}')">
+                <a href="{{ url_for('Airflow.delete', dag_id=dag.dag_id) }}"
+                  onclick="return confirmDeleteDag('{{ dag.dag_id }}')">
                    <span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true" data-original-title="Delete Dag"></span>
                 </a>
                 </td>
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index a15982369f..6566631483 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -185,13 +185,16 @@ def get_int_arg(value, default=0):
             hide_paused = hide_paused_dags_by_default
 
         # read orm_dags from the db
-        sql_query = session.query(DM).filter(
+        query = session.query(DM).filter(
             ~DM.is_subdag, DM.is_active
         )
 
         # optionally filter out "paused" dags
         if hide_paused:
-            sql_query = sql_query.filter(~DM.is_paused)
+            query = query.filter(~DM.is_paused)
+
+        if arg_search_query:
+            query = query.filter(sqla.func.lower(DM.dag_id) == arg_search_query.lower())
 
         import_errors = session.query(models.ImportError).all()
         for ie in import_errors:
@@ -207,74 +210,30 @@ def get_int_arg(value, default=0):
                     filename=filename),
                 "error")
 
-        # get a list of all non-subdag dags visible to everyone
-        # optionally filter out "paused" dags
-        if hide_paused:
-            unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
-                                         not dag.parent_dag and not dag.is_paused]
-
-        else:
-            unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
-                                         not dag.parent_dag]
-
         # Get all the dag id the user could access
         filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()
 
-        if 'all_dags' in filter_dag_ids:
-            orm_dags = {dag.dag_id: dag for dag
-                        in sql_query
-                        .all()}
-        else:
-            orm_dags = {dag.dag_id: dag for dag in
-                        sql_query.filter(DM.dag_id.in_(filter_dag_ids)).all()}
-            unfiltered_webserver_dags = [dag for dag in
-                                         unfiltered_webserver_dags
-                                         if dag.dag_id in filter_dag_ids]
-
-        webserver_dags = {
-            dag.dag_id: dag
-            for dag in unfiltered_webserver_dags
-        }
-
-        if arg_search_query:
-            lower_search_query = arg_search_query.lower()
-            # filter by dag_id
-            webserver_dags_filtered = {
-                dag_id: dag
-                for dag_id, dag in webserver_dags.items()
-                if (lower_search_query in dag_id.lower() or
-                    lower_search_query in dag.owner.lower())
-            }
-
-            all_dag_ids = (set([dag.dag_id for dag in orm_dags.values()
-                                if lower_search_query in dag.dag_id.lower() or
-                                lower_search_query in dag.owners.lower()]) |
-                           set(webserver_dags_filtered.keys()))
+        if 'all_dags' not in filter_dag_ids:
+            query = query.filter(DM.dag_id.in_(filter_dag_ids))
 
-            sorted_dag_ids = sorted(all_dag_ids)
-        else:
-            webserver_dags_filtered = webserver_dags
-            sorted_dag_ids = sorted(set(orm_dags.keys()) | set(webserver_dags.keys()))
+        query = query.order_by(DM.dag_id)
 
         start = current_page * dags_per_page
         end = start + dags_per_page
 
-        num_of_all_dags = len(sorted_dag_ids)
-        page_dag_ids = sorted_dag_ids[start:end]
+        dags = query.offset(start).limit(dags_per_page).all()
+
+        num_of_all_dags = query.count()
         num_of_pages = int(math.ceil(num_of_all_dags / float(dags_per_page)))
 
         auto_complete_data = set()
-        for dag in webserver_dags_filtered.values():
-            auto_complete_data.add(dag.dag_id)
-            auto_complete_data.add(dag.owner)
-        for dag in orm_dags.values():
-            auto_complete_data.add(dag.dag_id)
-            auto_complete_data.add(dag.owners)
+        for row in query.with_entities(DM.dag_id, DM.owners):
+            auto_complete_data.add(row.dag_id)
+            auto_complete_data.add(row.owners)
 
         return self.render(
             'airflow/dags.html',
-            webserver_dags=webserver_dags_filtered,
-            orm_dags=orm_dags,
+            dags=dags,
             hide_paused=hide_paused,
             current_page=current_page,
             search_query=arg_search_query if arg_search_query else '',
@@ -286,7 +245,6 @@ def get_int_arg(value, default=0):
             paging=wwwutils.generate_pages(current_page, num_of_pages,
                                            search=arg_search_query,
                                            showPaused=not hide_paused),
-            dag_ids_in_page=page_dag_ids,
             auto_complete_data=auto_complete_data,
             num_runs=num_runs)
 
diff --git a/tests/models.py b/tests/models.py
index a4f3aad367..edbc5a36c1 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -749,12 +749,14 @@ def test_sync_to_db(self, mock_now):
         self.assertIsNone(orm_dag.default_view)
         self.assertEqual(orm_dag.get_default_view(),
                          configuration.conf.get('webserver', 'dag_default_view').lower())
+        self.assertEqual(orm_dag.safe_dag_id, 'dag')
 
         orm_subdag = session.query(DagModel).filter(
             DagModel.dag_id == 'dag.subtask').one()
         self.assertEqual(set(orm_subdag.owners.split(', ')), {'owner1', 'owner2'})
         self.assertEqual(orm_subdag.last_scheduler_run, now)
         self.assertTrue(orm_subdag.is_active)
+        self.assertEqual(orm_subdag.safe_dag_id, 'dag__dot__subtask')
 
     @patch('airflow.models.timezone.utcnow')
     def test_sync_to_db_default_view(self, mock_now):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services