You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/13 22:45:05 UTC

[airflow] branch v1-10-test updated (d636886 -> 4c383a5)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from d636886  Improve heading on Email Configuration page (#10175)
     new f426fac  Respect DAG Serialization setting when running sync_perm (#10321)
     new d5372d5  Add __repr__ to SerializedDagModel (#9862)
     new a35b283  Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
     new 4c15f1d  Update Serialized DAGs in Webserver when DAGs are Updated (#9851)
     new 4c383a5  Use Hash of Serialized DAG to determine DAG is changed or not (#10227)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/bin/cli.py                                 |  2 +-
 airflow/config_templates/config.yml                |  8 +++
 airflow/config_templates/default_airflow.cfg       |  4 ++
 ...3a5a_add_dag_hash_column_to_serialized_dag_.py} | 24 ++++---
 airflow/models/dagbag.py                           | 40 +++++++++---
 airflow/models/serialized_dag.py                   | 46 ++++++++++++--
 airflow/settings.py                                |  5 ++
 docs/dag-serialization.rst                         | 11 +++-
 tests/models/test_dagbag.py                        | 45 +++++++++++++
 tests/models/test_serialized_dag.py                | 29 +++++++++
 tests/test_core.py                                 |  6 +-
 tests/test_utils/asserts.py                        | 73 ++++++++++++++++++++++
 12 files changed, 262 insertions(+), 31 deletions(-)
 copy airflow/migrations/versions/{561833c1c74b_add_password_column_to_user.py => da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py} (65%)
 create mode 100644 tests/test_utils/asserts.py


[airflow] 01/05: Respect DAG Serialization setting when running sync_perm (#10321)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f426fac0d3a8d88fccfb21ca62097c65bed1840d
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Aug 13 20:38:49 2020 +0100

    Respect DAG Serialization setting when running sync_perm (#10321)
    
    We run this on Webserver Startup and when DAG Serialization is enabled we expect that no files are required but because of this bug the files were still looked for.
    
    (cherry picked from commit 2d4e44c04e78adf887cf7ebe3decfca64e7c9158)
---
 airflow/bin/cli.py | 2 +-
 tests/test_core.py | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 51cf49d..c621a3a 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1740,7 +1740,7 @@ def sync_perm(args): # noqa
         print('Updating permission, view-menu for all existing roles')
         appbuilder.sm.sync_roles()
         print('Updating permission on all DAG views')
-        dags = DagBag().dags.values()
+        dags = DagBag(store_serialized_dags=settings.STORE_SERIALIZED_DAGS).dags.values()
         for dag in dags:
             appbuilder.sm.sync_perm_for_dag(
                 dag.dag_id,
diff --git a/tests/test_core.py b/tests/test_core.py
index 5530f2a..3d8e688 100644
--- a/tests/test_core.py
+++ b/tests/test_core.py
@@ -1098,6 +1098,7 @@ class CliTests(unittest.TestCase):
             self.assertIn('user{}'.format(i), stdout)
 
     @mock.patch("airflow.settings.RBAC", True)
+    @mock.patch("airflow.settings.STORE_SERIALIZED_DAGS", True)
     @mock.patch("airflow.bin.cli.DagBag")
     def test_cli_sync_perm(self, dagbag_mock):
         self.expect_dagbag_contains([
@@ -1115,9 +1116,8 @@ class CliTests(unittest.TestCase):
         cli.sync_perm(args)
 
         self.appbuilder.sm.sync_roles.assert_called_once()
-
-        self.assertEqual(2,
-                         len(self.appbuilder.sm.sync_perm_for_dag.mock_calls))
+        dagbag_mock.assert_called_once_with(store_serialized_dags=True)
+        self.assertEqual(2, len(self.appbuilder.sm.sync_perm_for_dag.mock_calls))
         self.appbuilder.sm.sync_perm_for_dag.assert_any_call(
             'has_access_control',
             {'Public': {'can_dag_read'}}


[airflow] 05/05: Use Hash of Serialized DAG to determine DAG is changed or not (#10227)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4c383a5139084704adbb4b0e622997b65f540ccb
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Aug 11 22:31:55 2020 +0100

    Use Hash of Serialized DAG to determine DAG is changed or not (#10227)
    
    closes #10116
    
    (cherry picked from commit adce6f029609e89f3651a89df40700589ec16237)
---
 ...c3a5a_add_dag_hash_column_to_serialized_dag_.py | 46 ++++++++++++++++++++++
 airflow/models/serialized_dag.py                   | 11 ++++--
 tests/models/test_serialized_dag.py                | 17 ++++----
 3 files changed, 62 insertions(+), 12 deletions(-)

diff --git a/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py
new file mode 100644
index 0000000..4acda3b
--- /dev/null
+++ b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add dag_hash Column to serialized_dag table
+
+Revision ID: da3f683c3a5a
+Revises: 8d48763f6d53
+Create Date: 2020-08-07 20:52:09.178296
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'da3f683c3a5a'
+down_revision = 'a66efa278eea'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add dag_hash Column to serialized_dag table"""
+    op.add_column(
+        'serialized_dag',
+        sa.Column('dag_hash', sa.String(32), nullable=False, server_default='Hash not calculated yet'))
+
+
+def downgrade():
+    """Unapply Add dag_hash Column to serialized_dag table"""
+    op.drop_column('serialized_dag', 'dag_hash')
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index d29e43c..f33e67b 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -19,6 +19,7 @@
 
 """Serialized DAG table in database."""
 
+import hashlib
 import logging
 from datetime import timedelta
 from typing import Any, Optional
@@ -53,7 +54,7 @@ class SerializedDagModel(Base):
       interval of deleting serialized DAGs in DB when the files are deleted, suggest
       to use a smaller interval such as 60
 
-    It is used by webserver to load dagbags when ``store_serialized_dags=True``.
+    It is used by webserver to load dags when ``store_serialized_dags=True``.
     Because reading from database is lightweight compared to importing from files,
     it solves the webserver scalability issue.
     """
@@ -65,6 +66,7 @@ class SerializedDagModel(Base):
     fileloc_hash = Column(BigInteger, nullable=False)
     data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
     last_updated = Column(UtcDateTime, nullable=False)
+    dag_hash = Column(String(32), nullable=False)
 
     __table_args__ = (
         Index('idx_fileloc_hash', fileloc_hash, unique=False),
@@ -76,6 +78,7 @@ class SerializedDagModel(Base):
         self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
+        self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()
 
     def __repr__(self):
         return "<SerializedDag: {}>".format(self.dag_id)
@@ -105,9 +108,11 @@ class SerializedDagModel(Base):
                 return
 
         log.debug("Checking if DAG (%s) changed", dag.dag_id)
-        serialized_dag_from_db = session.query(cls).get(dag.dag_id)    # type: SerializedDagModel
         new_serialized_dag = cls(dag)
-        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+        serialized_dag_hash_from_db = session.query(
+            cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar()
+
+        if serialized_dag_hash_from_db == new_serialized_dag.dag_hash:
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
             return
 
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index e25c7d7..283de65 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -83,27 +83,26 @@ class SerializedDagModelTest(unittest.TestCase):
         SDM.write_dag(dag=example_bash_op_dag)
 
         with db.create_session() as session:
-            last_updated = session.query(
-                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)
 
             # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
             # column is not updated
             SDM.write_dag(dag=example_bash_op_dag)
-            last_updated_1 = session.query(
-                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)
 
-            self.assertEqual(last_updated, last_updated_1)
+            self.assertEqual(s_dag_1.dag_hash, s_dag.dag_hash)
+            self.assertEqual(s_dag.last_updated, s_dag_1.last_updated)
 
             # Update DAG
             example_bash_op_dag.tags += ["new_tag"]
             self.assertCountEqual(example_bash_op_dag.tags, ["example", "new_tag"])
 
             SDM.write_dag(dag=example_bash_op_dag)
-            new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
-                SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)
 
-            self.assertNotEqual(last_updated, new_s_dag.last_updated)
-            self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
+            self.assertNotEqual(s_dag.last_updated, s_dag_2.last_updated)
+            self.assertNotEqual(s_dag.dag_hash, s_dag_2.dag_hash)
+            self.assertEqual(s_dag_2.data["dag"]["tags"], ["example", "new_tag"])
 
     def test_read_dags(self):
         """DAGs can be read from database."""


[airflow] 03/05: Don't Update Serialized DAGs in DB if DAG didn't change (#9850)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a35b283099a6ddea34dad24856dfd100fef512d0
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jul 20 12:31:05 2020 +0100

    Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
    
    We should not update the "last_updated" column unnecessarily. This is first of  few optimizations to DAG Serialization that would also aid in DAG Versioning
    
    (cherry picked from commit 1a32c45126f1086a52eeee52d4c19427af06274b)
---
 airflow/models/serialized_dag.py    | 22 ++++++++++++++++++----
 tests/models/test_serialized_dag.py | 30 ++++++++++++++++++++++++++++++
 2 files changed, 48 insertions(+), 4 deletions(-)

diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index c655e34..1313cac 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Serialzed DAG table in database."""
+"""Serialized DAG table in database."""
 
 import logging
 from datetime import timedelta
@@ -25,6 +25,7 @@ from typing import Any, Optional
 
 import sqlalchemy_jsonfield
 from sqlalchemy import BigInteger, Column, Index, String, and_
+from sqlalchemy.orm import Session  # noqa: F401
 from sqlalchemy.sql import exists
 
 from airflow.models.base import ID_LEN, Base
@@ -86,12 +87,13 @@ class SerializedDagModel(Base):
                   min_update_interval=None,   # type: Optional[int]
                   session=None):
         """Serializes a DAG and writes it into database.
+        If the record already exists, it checks if the Serialized DAG changed or not. If it is
+        changed, it updates the record, ignores otherwise.
 
         :param dag: a DAG to be written into database
         :param min_update_interval: minimal interval in seconds to update serialized DAG
         :param session: ORM Session
         """
-        log.debug("Writing DAG: %s to the DB", dag)
         # Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
         # If Yes, does nothing
         # If No or the DAG does not exists, updates / writes Serialized DAG to DB
@@ -102,8 +104,15 @@ class SerializedDagModel(Base):
             ).scalar():
                 return
 
-        log.debug("Writing DAG: %s to the DB", dag.dag_id)
-        session.merge(cls(dag))
+        log.debug("Checking if DAG (%s) changed", dag.dag_id)
+        serialized_dag_from_db = session.query(cls).get(dag.dag_id)    # type: SerializedDagModel
+        new_serialized_dag = cls(dag)
+        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+            log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
+            return
+
+        log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
+        session.merge(new_serialized_dag)
         log.debug("DAG: %s written to the DB", dag.dag_id)
 
     @classmethod
@@ -112,6 +121,7 @@ class SerializedDagModel(Base):
         """Reads all DAGs in serialized_dag table.
 
         :param session: ORM Session
+        :type session: Session
         :returns: a dict of DAGs read from database
         """
         serialized_dags = session.query(cls)
@@ -148,6 +158,7 @@ class SerializedDagModel(Base):
         :param dag_id: dag_id to be deleted
         :type dag_id: str
         :param session: ORM Session
+        :type session: Session
         """
         session.execute(cls.__table__.delete().where(cls.dag_id == dag_id))
 
@@ -159,6 +170,7 @@ class SerializedDagModel(Base):
         :param alive_dag_filelocs: file paths of alive DAGs
         :type alive_dag_filelocs: list
         :param session: ORM Session
+        :type session: Session
         """
         alive_fileloc_hashes = [
             DagCode.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
@@ -179,6 +191,7 @@ class SerializedDagModel(Base):
         :param dag_id: the DAG to check
         :type dag_id: str
         :param session: ORM Session
+        :type session: Session
         :rtype: bool
         """
         return session.query(exists().where(cls.dag_id == dag_id)).scalar()
@@ -193,6 +206,7 @@ class SerializedDagModel(Base):
 
         :param dag_id: the DAG to fetch
         :param session: ORM Session
+        :type session: Session
         """
         from airflow.models.dag import DagModel
         row = session.query(cls).filter(cls.dag_id == dag_id).one_or_none()
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index c70db78..e25c7d7 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -75,6 +75,36 @@ class SerializedDagModelTest(unittest.TestCase):
                 # Verifies JSON schema.
                 SerializedDAG.validate_schema(result.data)
 
+    def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
+        """Test Serialized DAG is updated if DAG is changed"""
+
+        example_dags = make_example_dags(example_dags_module)
+        example_bash_op_dag = example_dags.get("example_bash_operator")
+        SDM.write_dag(dag=example_bash_op_dag)
+
+        with db.create_session() as session:
+            last_updated = session.query(
+                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
+            # column is not updated
+            SDM.write_dag(dag=example_bash_op_dag)
+            last_updated_1 = session.query(
+                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertEqual(last_updated, last_updated_1)
+
+            # Update DAG
+            example_bash_op_dag.tags += ["new_tag"]
+            self.assertCountEqual(example_bash_op_dag.tags, ["example", "new_tag"])
+
+            SDM.write_dag(dag=example_bash_op_dag)
+            new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
+                SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertNotEqual(last_updated, new_s_dag.last_updated)
+            self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
+
     def test_read_dags(self):
         """DAGs can be read from database."""
         example_dags = self._write_example_dags()


[airflow] 04/05: Update Serialized DAGs in Webserver when DAGs are Updated (#9851)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4c15f1d9da74c3fa1ff06edadfb2375ce349f2e5
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jul 20 12:45:18 2020 +0100

    Update Serialized DAGs in Webserver when DAGs are Updated (#9851)
    
    Before this change, if DAG Serialization was enabled the Webserver would not update the DAGs once they are fetched from DB. The default worker_refresh_interval was `30` so whenever the gunicorn workers were restarted, they used to pull the updated DAGs when needed.
    
    This change will allow us to have a larged worker_refresh_interval (e.g 30 mins or even 1 day)
    
    (cherry picked from commit 84b85d8acc181edfe1fdd21b82c1773c19c47044)
---
 airflow/config_templates/config.yml          |  8 +++
 airflow/config_templates/default_airflow.cfg |  4 ++
 airflow/models/dagbag.py                     | 40 +++++++++++----
 airflow/models/serialized_dag.py             | 14 ++++++
 airflow/settings.py                          |  5 ++
 docs/dag-serialization.rst                   | 11 ++++-
 tests/models/test_dagbag.py                  | 45 +++++++++++++++++
 tests/test_utils/asserts.py                  | 73 ++++++++++++++++++++++++++++
 8 files changed, 188 insertions(+), 12 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 75c47cb..9535d5b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -447,6 +447,14 @@
       type: string
       example: ~
       default: "30"
+    - name: min_serialized_dag_fetch_interval
+      description: |
+        Fetching serialized DAG can not be faster than a minimum interval to reduce database
+        read rate. This config controls when your DAGs are updated in the Webserver
+      version_added: 1.10.12
+      type: string
+      example: ~
+      default: "10"
     - name: store_dag_code
       description: |
         Whether to persist DAG files code in DB.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 3a9bba2..9729403 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -234,6 +234,10 @@ store_serialized_dags = False
 # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
 min_serialized_dag_update_interval = 30
 
+# Fetching serialized DAG can not be faster than a minimum interval to reduce database
+# read rate. This config controls when your DAGs are updated in the Webserver
+min_serialized_dag_fetch_interval = 10
+
 # Whether to persist DAG files code in DB.
 # If set to True, Webserver reads file contents from DB instead of
 # trying to access files in a DAG folder. Defaults to same as the
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 48cbd3e..1b8be89 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -28,7 +28,7 @@ import sys
 import textwrap
 import zipfile
 from collections import namedtuple
-from datetime import datetime
+from datetime import datetime, timedelta
 
 from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter
 import six
@@ -102,6 +102,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         self.import_errors = {}
         self.has_logged = False
         self.store_serialized_dags = store_serialized_dags
+        self.dags_last_fetched = {}
 
         self.collect_dags(
             dag_folder=dag_folder,
@@ -127,20 +128,26 @@ class DagBag(BaseDagBag, LoggingMixin):
         """
         from airflow.models.dag import DagModel  # Avoid circular import
 
-        # Only read DAGs from DB if this dagbag is store_serialized_dags.
         if self.store_serialized_dags:
             # Import here so that serialized dag is only imported when serialization is enabled
             from airflow.models.serialized_dag import SerializedDagModel
             if dag_id not in self.dags:
                 # Load from DB if not (yet) in the bag
-                row = SerializedDagModel.get(dag_id)
-                if not row:
-                    return None
-
-                dag = row.dag
-                for subdag in dag.subdags:
-                    self.dags[subdag.dag_id] = subdag
-                self.dags[dag.dag_id] = dag
+                self._add_dag_from_db(dag_id=dag_id)
+                return self.dags.get(dag_id)
+
+            # If DAG is in the DagBag, check the following
+            # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
+            # 2. check the last_updated column in SerializedDag table to see if Serialized DAG is updated
+            # 3. if (2) is yes, fetch the Serialized DAG.
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched and
+                timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(dag_id=dag_id)
+                if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
+                    self._add_dag_from_db(dag_id=dag_id)
 
             return self.dags.get(dag_id)
 
@@ -178,6 +185,19 @@ class DagBag(BaseDagBag, LoggingMixin):
                 del self.dags[dag_id]
         return self.dags.get(dag_id)
 
+    def _add_dag_from_db(self, dag_id):
+        """Add DAG to DagBag from DB"""
+        from airflow.models.serialized_dag import SerializedDagModel
+        row = SerializedDagModel.get(dag_id)
+        if not row:
+            raise ValueError("DAG '{}' not found in serialized_dag table".format(dag_id))
+
+        dag = row.dag
+        for subdag in dag.subdags:
+            self.dags[subdag.dag_id] = subdag
+        self.dags[dag.dag_id] = dag
+        self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
+
     def process_file(self, filepath, only_if_updated=True, safe_mode=True):
         """
         Given a path to a python module or zip file, this method imports
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 1313cac..d29e43c 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -219,3 +219,17 @@ class SerializedDagModel(Base):
             DagModel.root_dag_id).filter(DagModel.dag_id == dag_id).scalar()
 
         return session.query(cls).filter(cls.dag_id == root_dag_id).one_or_none()
+
+    @classmethod
+    @db.provide_session
+    def get_last_updated_datetime(cls, dag_id, session):
+        """
+        Get the date when the Serialized DAG associated to DAG was last updated
+        in serialized_dag table
+
+        :param dag_id: DAG ID
+        :type dag_id: str
+        :param session: ORM Session
+        :type session: Session
+        """
+        return session.query(cls.last_updated).filter(cls.dag_id == dag_id).scalar()
diff --git a/airflow/settings.py b/airflow/settings.py
index 0158ec8..e39c960 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -428,6 +428,11 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac
 MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
     'core', 'min_serialized_dag_update_interval', fallback=30)
 
+# Fetching serialized DAG can not be faster than a minimum interval to reduce database
+# read rate. This config controls when your DAGs are updated in the Webserver
+MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint(
+    'core', 'min_serialized_dag_fetch_interval', fallback=10)
+
 # Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
 # from DB instead of trying to access files in a DAG folder.
 # Defaults to same as the store_serialized_dags setting.
diff --git a/docs/dag-serialization.rst b/docs/dag-serialization.rst
index 0edd644..e2fcf14 100644
--- a/docs/dag-serialization.rst
+++ b/docs/dag-serialization.rst
@@ -57,14 +57,21 @@ Add the following settings in ``airflow.cfg``:
 
     [core]
     store_serialized_dags = True
+    store_dag_code = True
+
+    # You can also update the following default configurations based on your needs
     min_serialized_dag_update_interval = 30
+    min_serialized_dag_fetch_interval = 10
 
 *   ``store_serialized_dags``: This flag decides whether to serialise DAGs and persist them in DB.
     If set to True, Webserver reads from DB instead of parsing DAG files
-*   ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
-    the serialized DAG in DB should be updated. This helps in reducing database write rate.
 *   ``store_dag_code``: This flag decides whether to persist DAG files code in DB.
     If set to True, Webserver reads file contents from DB instead of trying to access files in a DAG folder.
+*   ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
+    the serialized DAG in DB should be updated. This helps in reducing database write rate.
+*   ``min_serialized_dag_fetch_interval``: This flag controls how often a SerializedDAG will be re-fetched
+    from the DB when it's already loaded in the DagBag in the Webserver. Setting this higher will reduce
+    load on the DB, but at the expense of displaying a possibly stale cached version of the DAG.
 
 If you are updating Airflow from <1.10.7, please do not forget to run ``airflow upgradedb``.
 
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 04c2372..b9d18ac 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -19,6 +19,7 @@
 
 import inspect
 import os
+import six
 import shutil
 import textwrap
 import unittest
@@ -26,15 +27,19 @@ from datetime import datetime
 from tempfile import NamedTemporaryFile, mkdtemp
 
 from mock import patch, ANY
+from freezegun import freeze_time
 
 from airflow import models
 from airflow.configuration import conf
 from airflow.utils.dag_processing import SimpleTaskInstance
 from airflow.models import DagModel, DagBag, TaskInstance as TI
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.dates import timezone as tz
 from airflow.utils.db import create_session
 from airflow.utils.state import State
 from airflow.utils.timezone import utc
 from tests.models import TEST_DAGS_FOLDER, DEFAULT_DATE
+from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.config import conf_vars
 import airflow.example_dags
 
@@ -650,3 +655,43 @@ class DagBagTest(unittest.TestCase):
         # clean up
         with create_session() as session:
             session.query(DagModel).filter(DagModel.dag_id == 'test_deactivate_unknown_dags').delete()
+
+    @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_get_dag_with_dag_serialization(self):
+        """
+        Test that Serialized DAG is updated in DagBag when it is updated in
+        Serialized DAG table after 'min_serialized_dag_fetch_interval' seconds are passed.
+        """
+
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(store_serialized_dags=True)
+            ser_dag_1 = dag_bag.get_dag("example_bash_operator")
+            ser_dag_1_update_time = dag_bag.dags_last_fetched["example_bash_operator"]
+            self.assertEqual(example_bash_op_dag.tags, ser_dag_1.tags)
+            self.assertEqual(ser_dag_1_update_time, tz.datetime(2020, 1, 5, 0, 0, 0))
+
+        # Check that if min_serialized_dag_fetch_interval has not passed we do not fetch the DAG
+        # from DB
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
+            with assert_queries_count(0):
+                self.assertEqual(dag_bag.get_dag("example_bash_operator").tags, ["example"])
+
+        # Make a change in the DAG and write Serialized DAG to the DB
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
+            example_bash_op_dag.tags += ["new_tag"]
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+        # Since min_serialized_dag_fetch_interval is passed verify that calling 'dag_bag.get_dag'
+        # fetches the Serialized DAG from DB
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)):
+            with assert_queries_count(2):
+                updated_ser_dag_1 = dag_bag.get_dag("example_bash_operator")
+                updated_ser_dag_1_update_time = dag_bag.dags_last_fetched["example_bash_operator"]
+
+        six.assertCountEqual(self, updated_ser_dag_1.tags, ["example", "new_tag"])
+        self.assertGreater(updated_ser_dag_1_update_time, ser_dag_1_update_time)
diff --git a/tests/test_utils/asserts.py b/tests/test_utils/asserts.py
new file mode 100644
index 0000000..ca3cf2f
--- /dev/null
+++ b/tests/test_utils/asserts.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import re
+from contextlib import contextmanager
+
+from sqlalchemy import event
+
+# Long import to not create a copy of the reference, but to refer to one place.
+import airflow.settings
+
+log = logging.getLogger(__name__)
+
+
+def assert_equal_ignore_multiple_spaces(case, first, second, msg=None):
+    def _trim(s):
+        return re.sub(r"\s+", " ", s.strip())
+    return case.assertEqual(_trim(first), _trim(second), msg)
+
+
+class CountQueriesResult:
+    def __init__(self):
+        self.count = 0
+
+
+class CountQueries:
+    """
+    Counts the number of queries sent to Airflow Database in a given context.
+
+    Does not support multiple processes. When a new process is started in context, its queries will
+    not be included.
+    """
+    def __init__(self):
+        self.result = CountQueriesResult()
+
+    def __enter__(self):
+        event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
+        return self.result
+
+    def __exit__(self, type_, value, traceback):
+        event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
+        log.debug("Queries count: %d", self.result.count)
+
+    def after_cursor_execute(self, *args, **kwargs):
+        self.result.count += 1
+
+
+count_queries = CountQueries  # pylint: disable=invalid-name
+
+
+@contextmanager
+def assert_queries_count(expected_count, message_fmt=None):
+    with count_queries() as result:
+        yield None
+    message_fmt = message_fmt or "The expected number of db queries is {expected_count}. " \
+                                 "The current number is {current_count}."
+    message = message_fmt.format(current_count=result.count, expected_count=expected_count)
+    assert expected_count == result.count, message


[airflow] 02/05: Add __repr__ to SerializedDagModel (#9862)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d5372d5f33d35abb6b959ddb95f022a4da5de4e3
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Jul 18 22:28:31 2020 +0100

    Add __repr__ to SerializedDagModel (#9862)
    
    Before: `<airflow.models.serialized_dag.SerializedDagModel at 0x7fab30d68c50>`
    After: `<SerializedDag: example_xcom_args>`
    (cherry picked from commit 1dc852d4afe8073cf9b410cc5e98955d2028cb4a)
---
 airflow/models/serialized_dag.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index c1a647a..c655e34 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -76,6 +76,9 @@ class SerializedDagModel(Base):
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
 
+    def __repr__(self):
+        return "<SerializedDag: {}>".format(self.dag_id)
+
     @classmethod
     @db.provide_session
     def write_dag(cls,