You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/27 07:32:03 UTC

[GitHub] Fokko closed pull request #4368: [AIRFLOW-3561] Improve queries

Fokko closed pull request #4368: [AIRFLOW-3561] Improve queries
URL: https://github.com/apache/incubator-airflow/pull/4368
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/migrations/versions/c8ffec048a3b_add_fields_to_dag.py b/airflow/migrations/versions/c8ffec048a3b_add_fields_to_dag.py
new file mode 100644
index 0000000000..a74e0b50a4
--- /dev/null
+++ b/airflow/migrations/versions/c8ffec048a3b_add_fields_to_dag.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add fields to dag
+
+Revision ID: c8ffec048a3b
+Revises: 41f5f12752f8
+Create Date: 2018-12-23 21:55:46.463634
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'c8ffec048a3b'
+down_revision = '41f5f12752f8'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.add_column('dag', sa.Column('description', sa.Text(), nullable=True))
+    op.add_column('dag', sa.Column('default_view', sa.String(25), nullable=True))
+
+
+def downgrade():
+    op.drop_column('dag', 'description')
+    op.drop_column('dag', 'default_view')
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index aa93f5cb88..4bff05b721 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -2999,15 +2999,29 @@ class DagModel(Base):
     fileloc = Column(String(2000))
     # String representing the owners
     owners = Column(String(2000))
+    # Description of the dag
+    description = Column(Text)
+    # Default view of the inside the webserver
+    default_view = Column(String(25))
 
     def __repr__(self):
         return "<DAG: {self.dag_id}>".format(self=self)
 
+    @property
+    def timezone(self):
+        return settings.TIMEZONE
+
     @classmethod
     @provide_session
     def get_current(cls, dag_id, session=None):
         return session.query(cls).filter(cls.dag_id == dag_id).first()
 
+    def get_default_view(self):
+        if self.default_view is None:
+            return configuration.conf.get('webserver', 'dag_default_view').lower()
+        else:
+            return self.default_view
+
 
 @functools.total_ordering
 class DAG(BaseDag, LoggingMixin):
@@ -3109,7 +3123,7 @@ def __init__(
                 'core', 'max_active_runs_per_dag'),
             dagrun_timeout=None,
             sla_miss_callback=None,
-            default_view=configuration.conf.get('webserver', 'dag_default_view').lower(),
+            default_view=None,
             orientation=configuration.conf.get('webserver', 'dag_orientation'),
             catchup=configuration.conf.getboolean('scheduler', 'catchup_by_default'),
             on_success_callback=None, on_failure_callback=None,
@@ -3180,7 +3194,7 @@ def __init__(
         self.max_active_runs = max_active_runs
         self.dagrun_timeout = dagrun_timeout
         self.sla_miss_callback = sla_miss_callback
-        self.default_view = default_view
+        self._default_view = default_view
         self.orientation = orientation
         self.catchup = catchup
         self.is_subdag = False  # DagBag.bag_dag() will set this to True if appropriate
@@ -3249,6 +3263,13 @@ def __exit__(self, _type, _value, _tb):
 
     # /Context Manager ----------------------------------------------
 
+    def get_default_view(self):
+        """This is only there for backward compatible jinja2 templates"""
+        if self._default_view is None:
+            return configuration.conf.get('webserver', 'dag_default_view').lower()
+        else:
+            return self._default_view
+
     def date_range(self, start_date, num=None, end_date=timezone.utcnow()):
         if num:
             end_date = None
@@ -4201,6 +4222,8 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
         orm_dag.owners = owner
         orm_dag.is_active = True
         orm_dag.last_scheduler_run = sync_time
+        orm_dag.default_view = self._default_view
+        orm_dag.description = self.description
         session.merge(orm_dag)
         session.commit()
 
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index c34ec375b6..3d171c8893 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -49,7 +49,7 @@ <h4 class="pull-right">
   <div>
     <ul class="nav nav-pills">
       {% if dag.parent_dag %}
-          <li class="never_active"><a href="{{ url_for('airflow.' + dag.default_view, dag_id=dag.parent_dag.dag_id) }}">
+          <li class="never_active"><a href="{{ url_for('airflow.' + dag.get_default_view(), dag_id=dag.parent_dag.dag_id) }}">
               <span class="glyphicon glyphicon-arrow-left" aria-hidden="true"></span>
               Back to {{ dag.parent_dag.dag_id }}</a>
           </li>
@@ -345,7 +345,7 @@ <h4 class="modal-title" id="dagModalLabel">
     });
 
     $("#btn_subdag").click(function(){
-      url = "{{ url_for( 'airflow.' + dag.default_view ) }}" +
+      url = "{{ url_for( 'airflow.' + dag.get_default_view() ) }}" +
         "?dag_id=" + encodeURIComponent(subdag_id) +
         "&execution_date=" + encodeURIComponent(execution_date);
       window.location = url;
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 1c4e0227e6..5b3c0b7919 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -85,7 +85,7 @@ <h2>DAGs</h2>
                 <!-- Column 3: Name -->
                 <td>
                     {% if dag_id in webserver_dags %}
-                    <a href="{{ url_for('airflow.'+dag.default_view, dag_id=dag.dag_id) }}" title="{{ dag.description }}">
+                    <a href="{{ url_for('airflow.'+ dag.get_default_view(), dag_id=dag.dag_id) }}" title="{{ dag.description }}">
                         {{ dag_id }}
                     </a>
                     {% else %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d9edbefabc..642a63a2e2 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -550,31 +550,29 @@ def chart(self):
     @login_required
     @provide_session
     def dag_stats(self, session=None):
-        ds = models.DagStat
+        dr = models.DagRun
+        dm = models.DagModel
+        dag_ids = session.query(dm.dag_id)
 
-        ds.update(
-            dag_ids=[dag.dag_id for dag in dagbag.dags.values() if not dag.is_subdag]
-        )
-
-        qry = (
-            session.query(ds.dag_id, ds.state, ds.count)
-        )
+        dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state)).group_by(dr.dag_id, dr.state)
 
         data = {}
-        for dag_id, state, count in qry:
+        for (dag_id, ) in dag_ids:
+            data[dag_id] = {}
+        for dag_id, state, count in dag_state_stats:
             if dag_id not in data:
                 data[dag_id] = {}
             data[dag_id][state] = count
 
         payload = {}
-        for dag in dagbag.dags.values():
-            payload[dag.safe_dag_id] = []
+        for dag_id, d in data.items():
+            payload[dag_id] = []
             for state in State.dag_states:
-                count = data.get(dag.dag_id, {}).get(state, 0)
-                payload[dag.safe_dag_id].append({
+                count = d.get(state, 0)
+                payload[dag_id].append({
                     'state': state,
                     'count': count,
-                    'dag_id': dag.dag_id,
+                    'dag_id': dag_id,
                     'color': State.color(state)
                 })
         return wwwutils.json_response(payload)
@@ -587,6 +585,8 @@ def task_stats(self, session=None):
         DagRun = models.DagRun
         Dag = models.DagModel
 
+        dag_ids = session.query(Dag.dag_id)
+
         LastDagRun = (
             session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
                 .join(Dag, Dag.dag_id == DagRun.dag_id)
@@ -634,24 +634,25 @@ def task_stats(self, session=None):
         session.commit()
 
         payload = {}
-        for dag in dagbag.dags.values():
-            payload[dag.safe_dag_id] = []
+        for (dag_id, ) in dag_ids:
+            payload[dag_id] = []
             for state in State.task_states:
-                count = data.get(dag.dag_id, {}).get(state, 0)
-                payload[dag.safe_dag_id].append({
+                count = data.get(dag_id, {}).get(state, 0)
+                payload[dag_id].append({
                     'state': state,
                     'count': count,
-                    'dag_id': dag.dag_id,
+                    'dag_id': dag_id,
                     'color': State.color(state)
                 })
         return wwwutils.json_response(payload)
 
     @expose('/code')
     @login_required
-    def code(self):
+    @provide_session
+    def code(self, session=None):
         dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-        title = dag_id
+        dm = models.DagModel
+        dag = session.query(dm).filter(dm.dag_id == dag_id).first()
         try:
             with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
                 code = f.read()
@@ -661,7 +662,7 @@ def code(self):
             html_code = str(e)
 
         return self.render(
-            'airflow/dag_code.html', html_code=html_code, dag=dag, title=title,
+            'airflow/dag_code.html', html_code=html_code, dag=dag, title=dag_id,
             root=request.args.get('root'),
             demo_mode=conf.getboolean('webserver', 'demo_mode'))
 
@@ -941,8 +942,11 @@ def xcom(self, session=None):
         execution_date = request.args.get('execution_date')
         dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
-        dag = dagbag.get_dag(dag_id)
-        if not dag or task_id not in dag.task_ids:
+        dm_db = models.DagModel
+        ti_db = models.TaskInstance
+        dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()
+        ti = session.query(ti_db).filter(ti_db.dag_id == dag_id and ti_db.task_id == task_id).first()
+        if not ti:
             flash(
                 "Task [{}.{}] doesn't seem to exist"
                 " at the moment".format(dag_id, task_id),
diff --git a/airflow/www_rbac/templates/airflow/dag.html b/airflow/www_rbac/templates/airflow/dag.html
index aa1df63dce..ed545c09cf 100644
--- a/airflow/www_rbac/templates/airflow/dag.html
+++ b/airflow/www_rbac/templates/airflow/dag.html
@@ -49,7 +49,7 @@ <h4 class="pull-right">
     {% set num_runs_arg = request.args.get('num_runs') %}
     <ul class="nav nav-pills">
       {% if dag.parent_dag %}
-          <li class="never_active"><a href="{{ url_for('Airflow.' + dag.default_view, dag_id=dag.parent_dag.dag_id) }}">
+          <li class="never_active"><a href="{{ url_for('Airflow.' + dag.get_default_view(), dag_id=dag.parent_dag.dag_id) }}">
               <span class="glyphicon glyphicon-arrow-left" aria-hidden="true"></span>
               Back to {{ dag.parent_dag.dag_id }}</a>
           </li>
@@ -345,7 +345,7 @@ <h4 class="modal-title" id="dagModalLabel">
     });
 
     $("#btn_subdag").click(function(){
-      url = "{{ url_for( 'Airflow.' + dag.default_view ) }}" +
+      url = "{{ url_for( 'Airflow.' + dag.get_default_view() ) }}" +
         "?dag_id=" + encodeURIComponent(subdag_id) +
         "&execution_date=" + encodeURIComponent(execution_date);
       window.location = url;
diff --git a/airflow/www_rbac/templates/airflow/dags.html b/airflow/www_rbac/templates/airflow/dags.html
index c6b59e98a0..64883ba0ed 100644
--- a/airflow/www_rbac/templates/airflow/dags.html
+++ b/airflow/www_rbac/templates/airflow/dags.html
@@ -86,7 +86,7 @@ <h2>DAGs</h2>
                 <!-- Column 3: Name -->
                 <td>
                     {% if dag_id in webserver_dags %}
-                    <a href="{{ url_for('Airflow.'+dag.default_view, dag_id=dag.dag_id) }}" title="{{ dag.description }}">
+                    <a href="{{ url_for('Airflow.'+ dag.get_default_view(), dag_id=dag.dag_id) }}" title="{{ dag.description }}">
                         {{ dag_id }}
                     </a>
                     {% else %}
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 31efc4459b..42328279a3 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -277,35 +277,34 @@ def get_int_arg(value, default=0):
     @has_access
     @provide_session
     def dag_stats(self, session=None):
-        ds = models.DagStat
-
-        ds.update()
-
-        qry = (
-            session.query(ds.dag_id, ds.state, ds.count)
-        )
+        dr = models.DagRun
 
         filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()
 
+        dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state)).group_by(dr.dag_id, dr.state)
+
         payload = {}
         if filter_dag_ids:
             if 'all_dags' not in filter_dag_ids:
-                qry = qry.filter(ds.dag_id.in_(filter_dag_ids))
+                dag_state_stats = dag_state_stats.filter(dr.dag_id.in_(filter_dag_ids))
             data = {}
-            for dag_id, state, count in qry:
+            for dag_id, state, count in dag_state_stats:
                 if dag_id not in data:
                     data[dag_id] = {}
                 data[dag_id][state] = count
 
-            for dag in dagbag.dags.values():
-                if 'all_dags' in filter_dag_ids or dag.dag_id in filter_dag_ids:
-                    payload[dag.safe_dag_id] = []
+            if 'all_dags' in filter_dag_ids:
+                filter_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]
+
+            for dag_id in filter_dag_ids:
+                if 'all_dags' in filter_dag_ids or dag_id in filter_dag_ids:
+                    payload[dag_id] = []
                     for state in State.dag_states:
-                        count = data.get(dag.dag_id, {}).get(state, 0)
-                        payload[dag.safe_dag_id].append({
+                        count = data.get(dag_id, {}).get(state, 0)
+                        payload[dag_id].append({
                             'state': state,
                             'count': count,
-                            'dag_id': dag.dag_id,
+                            'dag_id': dag_id,
                             'color': State.color(state)
                         })
         return wwwutils.json_response(payload)
@@ -371,15 +370,17 @@ def task_stats(self, session=None):
                 data[dag_id][state] = count
         session.commit()
 
-        for dag in dagbag.dags.values():
-            if 'all_dags' in filter_dag_ids or dag.dag_id in filter_dag_ids:
-                payload[dag.safe_dag_id] = []
+        if 'all_dags' in filter_dag_ids:
+            filter_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]
+        for dag_id in filter_dag_ids:
+            if 'all_dags' in filter_dag_ids or dag_id in filter_dag_ids:
+                payload[dag_id] = []
                 for state in State.task_states:
-                    count = data.get(dag.dag_id, {}).get(state, 0)
-                    payload[dag.safe_dag_id].append({
+                    count = data.get(dag_id, {}).get(state, 0)
+                    payload[dag_id].append({
                         'state': state,
                         'count': count,
-                        'dag_id': dag.dag_id,
+                        'dag_id': dag_id,
                         'color': State.color(state)
                     })
         return wwwutils.json_response(payload)
@@ -387,10 +388,11 @@ def task_stats(self, session=None):
     @expose('/code')
     @has_dag_access(can_dag_read=True)
     @has_access
-    def code(self):
+    @provide_session
+    def code(self, session=None):
+        dm = models.DagModel
         dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-        title = dag_id
+        dag = session.query(dm).filter(dm.dag_id == dag_id).first()
         try:
             with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
                 code = f.read()
@@ -400,7 +402,7 @@ def code(self):
             html_code = str(e)
 
         return self.render(
-            'airflow/dag_code.html', html_code=html_code, dag=dag, title=title,
+            'airflow/dag_code.html', html_code=html_code, dag=dag, title=dag_id,
             root=request.args.get('root'),
             demo_mode=conf.getboolean('webserver', 'demo_mode'))
 
@@ -666,8 +668,12 @@ def xcom(self, session=None):
         execution_date = request.args.get('execution_date')
         dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
-        dag = dagbag.get_dag(dag_id)
-        if not dag or task_id not in dag.task_ids:
+        dm_db = models.DagModel
+        ti_db = models.TaskInstance
+        dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()
+        ti = session.query(ti_db).filter(ti_db.dag_id == dag_id and ti_db.task_id == task_id).first()
+
+        if not ti:
             flash(
                 "Task [{}.{}] doesn't seem to exist"
                 " at the moment".format(dag_id, task_id),
diff --git a/tests/models.py b/tests/models.py
index 2e8a139eb0..2da84e6520 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -746,6 +746,8 @@ def test_sync_to_db(self, mock_now):
         self.assertEqual(set(orm_dag.owners.split(', ')), {'owner1', 'owner2'})
         self.assertEqual(orm_dag.last_scheduler_run, now)
         self.assertTrue(orm_dag.is_active)
+        self.assertIsNone(orm_dag.default_view)
+        self.assertEqual(orm_dag.get_default_view(), configuration.conf.get('webserver', 'dag_default_view').lower())
 
         orm_subdag = session.query(DagModel).filter(
             DagModel.dag_id == 'dag.subtask').one()
@@ -753,6 +755,32 @@ def test_sync_to_db(self, mock_now):
         self.assertEqual(orm_subdag.last_scheduler_run, now)
         self.assertTrue(orm_subdag.is_active)
 
+    @patch('airflow.models.timezone.utcnow')
+    def test_sync_to_db_default_view(self, mock_now):
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_view="graph",
+        )
+        with dag:
+            DummyOperator(task_id='task', owner='owner1')
+            SubDagOperator(
+                task_id='subtask',
+                owner='owner2',
+                subdag=DAG(
+                    'dag.subtask',
+                    start_date=DEFAULT_DATE,
+                )
+            )
+        now = datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone('UTC'))
+        mock_now.return_value = now
+        session = settings.Session()
+        dag.sync_to_db(session=session)
+
+        orm_dag = session.query(DagModel).filter(DagModel.dag_id == 'dag').one()
+        self.assertIsNotNone(orm_dag.default_view)
+        self.assertEqual(orm_dag.get_default_view(), "graph")
+
 
 class DagStatTest(unittest.TestCase):
     def test_dagstats_crud(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services