You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/13 23:42:15 UTC

[airflow] branch v1-10-test updated (4c383a5 -> ab5dd24)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 4c383a5  Use Hash of Serialized DAG to determine DAG is changed or not (#10227)
 discard 4c15f1d  Update Serialized DAGs in Webserver when DAGs are Updated (#9851)
 discard a35b283  Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
     new fde8f04  Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
     new 8d2753e  Update Serialized DAGs in Webserver when DAGs are Updated (#9851)
     new ab5dd24  Use Hash of Serialized DAG to determine DAG is changed or not (#10227)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4c383a5)
            \
             N -- N -- N   refs/heads/v1-10-test (ab5dd24)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/models/test_serialized_dag.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)


[airflow] 02/03: Update Serialized DAGs in Webserver when DAGs are Updated (#9851)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8d2753ed9d54ba752bc5aa3500b1c44090792c6d
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jul 20 12:45:18 2020 +0100

    Update Serialized DAGs in Webserver when DAGs are Updated (#9851)
    
    Before this change, if DAG Serialization was enabled the Webserver would not update the DAGs once they are fetched from DB. The default worker_refresh_interval was `30` so whenever the gunicorn workers were restarted, they used to pull the updated DAGs when needed.
    
    This change will allow us to have a larged worker_refresh_interval (e.g 30 mins or even 1 day)
    
    (cherry picked from commit 84b85d8acc181edfe1fdd21b82c1773c19c47044)
---
 airflow/config_templates/config.yml          |  8 +++
 airflow/config_templates/default_airflow.cfg |  4 ++
 airflow/models/dagbag.py                     | 40 +++++++++++----
 airflow/models/serialized_dag.py             | 14 ++++++
 airflow/settings.py                          |  5 ++
 docs/dag-serialization.rst                   | 11 ++++-
 tests/models/test_dagbag.py                  | 45 +++++++++++++++++
 tests/test_utils/asserts.py                  | 73 ++++++++++++++++++++++++++++
 8 files changed, 188 insertions(+), 12 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 75c47cb..9535d5b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -447,6 +447,14 @@
       type: string
       example: ~
       default: "30"
+    - name: min_serialized_dag_fetch_interval
+      description: |
+        Fetching serialized DAG can not be faster than a minimum interval to reduce database
+        read rate. This config controls when your DAGs are updated in the Webserver
+      version_added: 1.10.12
+      type: string
+      example: ~
+      default: "10"
     - name: store_dag_code
       description: |
         Whether to persist DAG files code in DB.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 3a9bba2..9729403 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -234,6 +234,10 @@ store_serialized_dags = False
 # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
 min_serialized_dag_update_interval = 30
 
+# Fetching serialized DAG can not be faster than a minimum interval to reduce database
+# read rate. This config controls when your DAGs are updated in the Webserver
+min_serialized_dag_fetch_interval = 10
+
 # Whether to persist DAG files code in DB.
 # If set to True, Webserver reads file contents from DB instead of
 # trying to access files in a DAG folder. Defaults to same as the
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 48cbd3e..1b8be89 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -28,7 +28,7 @@ import sys
 import textwrap
 import zipfile
 from collections import namedtuple
-from datetime import datetime
+from datetime import datetime, timedelta
 
 from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter
 import six
@@ -102,6 +102,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         self.import_errors = {}
         self.has_logged = False
         self.store_serialized_dags = store_serialized_dags
+        self.dags_last_fetched = {}
 
         self.collect_dags(
             dag_folder=dag_folder,
@@ -127,20 +128,26 @@ class DagBag(BaseDagBag, LoggingMixin):
         """
         from airflow.models.dag import DagModel  # Avoid circular import
 
-        # Only read DAGs from DB if this dagbag is store_serialized_dags.
         if self.store_serialized_dags:
             # Import here so that serialized dag is only imported when serialization is enabled
             from airflow.models.serialized_dag import SerializedDagModel
             if dag_id not in self.dags:
                 # Load from DB if not (yet) in the bag
-                row = SerializedDagModel.get(dag_id)
-                if not row:
-                    return None
-
-                dag = row.dag
-                for subdag in dag.subdags:
-                    self.dags[subdag.dag_id] = subdag
-                self.dags[dag.dag_id] = dag
+                self._add_dag_from_db(dag_id=dag_id)
+                return self.dags.get(dag_id)
+
+            # If DAG is in the DagBag, check the following
+            # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
+            # 2. check the last_updated column in SerializedDag table to see if Serialized DAG is updated
+            # 3. if (2) is yes, fetch the Serialized DAG.
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched and
+                timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(dag_id=dag_id)
+                if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
+                    self._add_dag_from_db(dag_id=dag_id)
 
             return self.dags.get(dag_id)
 
@@ -178,6 +185,19 @@ class DagBag(BaseDagBag, LoggingMixin):
                 del self.dags[dag_id]
         return self.dags.get(dag_id)
 
+    def _add_dag_from_db(self, dag_id):
+        """Add DAG to DagBag from DB"""
+        from airflow.models.serialized_dag import SerializedDagModel
+        row = SerializedDagModel.get(dag_id)
+        if not row:
+            raise ValueError("DAG '{}' not found in serialized_dag table".format(dag_id))
+
+        dag = row.dag
+        for subdag in dag.subdags:
+            self.dags[subdag.dag_id] = subdag
+        self.dags[dag.dag_id] = dag
+        self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
+
     def process_file(self, filepath, only_if_updated=True, safe_mode=True):
         """
         Given a path to a python module or zip file, this method imports
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 1313cac..d29e43c 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -219,3 +219,17 @@ class SerializedDagModel(Base):
             DagModel.root_dag_id).filter(DagModel.dag_id == dag_id).scalar()
 
         return session.query(cls).filter(cls.dag_id == root_dag_id).one_or_none()
+
+    @classmethod
+    @db.provide_session
+    def get_last_updated_datetime(cls, dag_id, session):
+        """
+        Get the date when the Serialized DAG associated to DAG was last updated
+        in serialized_dag table
+
+        :param dag_id: DAG ID
+        :type dag_id: str
+        :param session: ORM Session
+        :type session: Session
+        """
+        return session.query(cls.last_updated).filter(cls.dag_id == dag_id).scalar()
diff --git a/airflow/settings.py b/airflow/settings.py
index 0158ec8..e39c960 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -428,6 +428,11 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac
 MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
     'core', 'min_serialized_dag_update_interval', fallback=30)
 
+# Fetching serialized DAG can not be faster than a minimum interval to reduce database
+# read rate. This config controls when your DAGs are updated in the Webserver
+MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint(
+    'core', 'min_serialized_dag_fetch_interval', fallback=10)
+
 # Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
 # from DB instead of trying to access files in a DAG folder.
 # Defaults to same as the store_serialized_dags setting.
diff --git a/docs/dag-serialization.rst b/docs/dag-serialization.rst
index 0edd644..e2fcf14 100644
--- a/docs/dag-serialization.rst
+++ b/docs/dag-serialization.rst
@@ -57,14 +57,21 @@ Add the following settings in ``airflow.cfg``:
 
     [core]
     store_serialized_dags = True
+    store_dag_code = True
+
+    # You can also update the following default configurations based on your needs
     min_serialized_dag_update_interval = 30
+    min_serialized_dag_fetch_interval = 10
 
 *   ``store_serialized_dags``: This flag decides whether to serialise DAGs and persist them in DB.
     If set to True, Webserver reads from DB instead of parsing DAG files
-*   ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
-    the serialized DAG in DB should be updated. This helps in reducing database write rate.
 *   ``store_dag_code``: This flag decides whether to persist DAG files code in DB.
     If set to True, Webserver reads file contents from DB instead of trying to access files in a DAG folder.
+*   ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
+    the serialized DAG in DB should be updated. This helps in reducing database write rate.
+*   ``min_serialized_dag_fetch_interval``: This flag controls how often a SerializedDAG will be re-fetched
+    from the DB when it's already loaded in the DagBag in the Webserver. Setting this higher will reduce
+    load on the DB, but at the expense of displaying a possibly stale cached version of the DAG.
 
 If you are updating Airflow from <1.10.7, please do not forget to run ``airflow upgradedb``.
 
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 04c2372..b9d18ac 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -19,6 +19,7 @@
 
 import inspect
 import os
+import six
 import shutil
 import textwrap
 import unittest
@@ -26,15 +27,19 @@ from datetime import datetime
 from tempfile import NamedTemporaryFile, mkdtemp
 
 from mock import patch, ANY
+from freezegun import freeze_time
 
 from airflow import models
 from airflow.configuration import conf
 from airflow.utils.dag_processing import SimpleTaskInstance
 from airflow.models import DagModel, DagBag, TaskInstance as TI
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.utils.dates import timezone as tz
 from airflow.utils.db import create_session
 from airflow.utils.state import State
 from airflow.utils.timezone import utc
 from tests.models import TEST_DAGS_FOLDER, DEFAULT_DATE
+from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.config import conf_vars
 import airflow.example_dags
 
@@ -650,3 +655,43 @@ class DagBagTest(unittest.TestCase):
         # clean up
         with create_session() as session:
             session.query(DagModel).filter(DagModel.dag_id == 'test_deactivate_unknown_dags').delete()
+
+    @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_get_dag_with_dag_serialization(self):
+        """
+        Test that Serialized DAG is updated in DagBag when it is updated in
+        Serialized DAG table after 'min_serialized_dag_fetch_interval' seconds are passed.
+        """
+
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(store_serialized_dags=True)
+            ser_dag_1 = dag_bag.get_dag("example_bash_operator")
+            ser_dag_1_update_time = dag_bag.dags_last_fetched["example_bash_operator"]
+            self.assertEqual(example_bash_op_dag.tags, ser_dag_1.tags)
+            self.assertEqual(ser_dag_1_update_time, tz.datetime(2020, 1, 5, 0, 0, 0))
+
+        # Check that if min_serialized_dag_fetch_interval has not passed we do not fetch the DAG
+        # from DB
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
+            with assert_queries_count(0):
+                self.assertEqual(dag_bag.get_dag("example_bash_operator").tags, ["example"])
+
+        # Make a change in the DAG and write Serialized DAG to the DB
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
+            example_bash_op_dag.tags += ["new_tag"]
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+        # Since min_serialized_dag_fetch_interval is passed verify that calling 'dag_bag.get_dag'
+        # fetches the Serialized DAG from DB
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)):
+            with assert_queries_count(2):
+                updated_ser_dag_1 = dag_bag.get_dag("example_bash_operator")
+                updated_ser_dag_1_update_time = dag_bag.dags_last_fetched["example_bash_operator"]
+
+        six.assertCountEqual(self, updated_ser_dag_1.tags, ["example", "new_tag"])
+        self.assertGreater(updated_ser_dag_1_update_time, ser_dag_1_update_time)
diff --git a/tests/test_utils/asserts.py b/tests/test_utils/asserts.py
new file mode 100644
index 0000000..ca3cf2f
--- /dev/null
+++ b/tests/test_utils/asserts.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import re
+from contextlib import contextmanager
+
+from sqlalchemy import event
+
+# Long import to not create a copy of the reference, but to refer to one place.
+import airflow.settings
+
+log = logging.getLogger(__name__)
+
+
+def assert_equal_ignore_multiple_spaces(case, first, second, msg=None):
+    def _trim(s):
+        return re.sub(r"\s+", " ", s.strip())
+    return case.assertEqual(_trim(first), _trim(second), msg)
+
+
+class CountQueriesResult:
+    def __init__(self):
+        self.count = 0
+
+
+class CountQueries:
+    """
+    Counts the number of queries sent to Airflow Database in a given context.
+
+    Does not support multiple processes. When a new process is started in context, its queries will
+    not be included.
+    """
+    def __init__(self):
+        self.result = CountQueriesResult()
+
+    def __enter__(self):
+        event.listen(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
+        return self.result
+
+    def __exit__(self, type_, value, traceback):
+        event.remove(airflow.settings.engine, "after_cursor_execute", self.after_cursor_execute)
+        log.debug("Queries count: %d", self.result.count)
+
+    def after_cursor_execute(self, *args, **kwargs):
+        self.result.count += 1
+
+
+count_queries = CountQueries  # pylint: disable=invalid-name
+
+
+@contextmanager
+def assert_queries_count(expected_count, message_fmt=None):
+    with count_queries() as result:
+        yield None
+    message_fmt = message_fmt or "The expected number of db queries is {expected_count}. " \
+                                 "The current number is {current_count}."
+    message = message_fmt.format(current_count=result.count, expected_count=expected_count)
+    assert expected_count == result.count, message


[airflow] 01/03: Don't Update Serialized DAGs in DB if DAG didn't change (#9850)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fde8f04a61034344f06afaeeebb06cb8935deeb6
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jul 20 12:31:05 2020 +0100

    Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
    
    We should not update the "last_updated" column unnecessarily. This is first of  few optimizations to DAG Serialization that would also aid in DAG Versioning
    
    (cherry picked from commit 1a32c45126f1086a52eeee52d4c19427af06274b)
---
 airflow/models/serialized_dag.py    | 22 ++++++++++++++++++----
 tests/models/test_serialized_dag.py | 31 +++++++++++++++++++++++++++++++
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index c655e34..1313cac 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Serialzed DAG table in database."""
+"""Serialized DAG table in database."""
 
 import logging
 from datetime import timedelta
@@ -25,6 +25,7 @@ from typing import Any, Optional
 
 import sqlalchemy_jsonfield
 from sqlalchemy import BigInteger, Column, Index, String, and_
+from sqlalchemy.orm import Session  # noqa: F401
 from sqlalchemy.sql import exists
 
 from airflow.models.base import ID_LEN, Base
@@ -86,12 +87,13 @@ class SerializedDagModel(Base):
                   min_update_interval=None,   # type: Optional[int]
                   session=None):
         """Serializes a DAG and writes it into database.
+        If the record already exists, it checks if the Serialized DAG changed or not. If it is
+        changed, it updates the record, ignores otherwise.
 
         :param dag: a DAG to be written into database
         :param min_update_interval: minimal interval in seconds to update serialized DAG
         :param session: ORM Session
         """
-        log.debug("Writing DAG: %s to the DB", dag)
         # Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
         # If Yes, does nothing
         # If No or the DAG does not exists, updates / writes Serialized DAG to DB
@@ -102,8 +104,15 @@ class SerializedDagModel(Base):
             ).scalar():
                 return
 
-        log.debug("Writing DAG: %s to the DB", dag.dag_id)
-        session.merge(cls(dag))
+        log.debug("Checking if DAG (%s) changed", dag.dag_id)
+        serialized_dag_from_db = session.query(cls).get(dag.dag_id)    # type: SerializedDagModel
+        new_serialized_dag = cls(dag)
+        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+            log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
+            return
+
+        log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
+        session.merge(new_serialized_dag)
         log.debug("DAG: %s written to the DB", dag.dag_id)
 
     @classmethod
@@ -112,6 +121,7 @@ class SerializedDagModel(Base):
         """Reads all DAGs in serialized_dag table.
 
         :param session: ORM Session
+        :type session: Session
         :returns: a dict of DAGs read from database
         """
         serialized_dags = session.query(cls)
@@ -148,6 +158,7 @@ class SerializedDagModel(Base):
         :param dag_id: dag_id to be deleted
         :type dag_id: str
         :param session: ORM Session
+        :type session: Session
         """
         session.execute(cls.__table__.delete().where(cls.dag_id == dag_id))
 
@@ -159,6 +170,7 @@ class SerializedDagModel(Base):
         :param alive_dag_filelocs: file paths of alive DAGs
         :type alive_dag_filelocs: list
         :param session: ORM Session
+        :type session: Session
         """
         alive_fileloc_hashes = [
             DagCode.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
@@ -179,6 +191,7 @@ class SerializedDagModel(Base):
         :param dag_id: the DAG to check
         :type dag_id: str
         :param session: ORM Session
+        :type session: Session
         :rtype: bool
         """
         return session.query(exists().where(cls.dag_id == dag_id)).scalar()
@@ -193,6 +206,7 @@ class SerializedDagModel(Base):
 
         :param dag_id: the DAG to fetch
         :param session: ORM Session
+        :type session: Session
         """
         from airflow.models.dag import DagModel
         row = session.query(cls).filter(cls.dag_id == dag_id).one_or_none()
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index c70db78..4e0e8ea 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -19,6 +19,7 @@
 
 """Unit tests for SerializedDagModel."""
 
+import six
 import unittest
 
 from airflow import example_dags as example_dags_module
@@ -75,6 +76,36 @@ class SerializedDagModelTest(unittest.TestCase):
                 # Verifies JSON schema.
                 SerializedDAG.validate_schema(result.data)
 
+    def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
+        """Test Serialized DAG is updated if DAG is changed"""
+
+        example_dags = make_example_dags(example_dags_module)
+        example_bash_op_dag = example_dags.get("example_bash_operator")
+        SDM.write_dag(dag=example_bash_op_dag)
+
+        with db.create_session() as session:
+            last_updated = session.query(
+                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
+            # column is not updated
+            SDM.write_dag(dag=example_bash_op_dag)
+            last_updated_1 = session.query(
+                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertEqual(last_updated, last_updated_1)
+
+            # Update DAG
+            example_bash_op_dag.tags += ["new_tag"]
+            six.assertCountEqual(self, example_bash_op_dag.tags, ["example", "new_tag"])
+
+            SDM.write_dag(dag=example_bash_op_dag)
+            new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
+                SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertNotEqual(last_updated, new_s_dag.last_updated)
+            self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
+
     def test_read_dags(self):
         """DAGs can be read from database."""
         example_dags = self._write_example_dags()


[airflow] 03/03: Use Hash of Serialized DAG to determine DAG is changed or not (#10227)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ab5dd2442f44bc94cbcbcf2c1ed52ff80cd4e41e
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Aug 11 22:31:55 2020 +0100

    Use Hash of Serialized DAG to determine DAG is changed or not (#10227)
    
    closes #10116
    
    (cherry picked from commit adce6f029609e89f3651a89df40700589ec16237)
---
 ...c3a5a_add_dag_hash_column_to_serialized_dag_.py | 46 ++++++++++++++++++++++
 airflow/models/serialized_dag.py                   | 11 ++++--
 tests/models/test_serialized_dag.py                | 17 ++++----
 3 files changed, 62 insertions(+), 12 deletions(-)

diff --git a/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py
new file mode 100644
index 0000000..4acda3b
--- /dev/null
+++ b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add dag_hash Column to serialized_dag table
+
+Revision ID: da3f683c3a5a
+Revises: 8d48763f6d53
+Create Date: 2020-08-07 20:52:09.178296
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'da3f683c3a5a'
+down_revision = 'a66efa278eea'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add dag_hash Column to serialized_dag table"""
+    op.add_column(
+        'serialized_dag',
+        sa.Column('dag_hash', sa.String(32), nullable=False, server_default='Hash not calculated yet'))
+
+
+def downgrade():
+    """Unapply Add dag_hash Column to serialized_dag table"""
+    op.drop_column('serialized_dag', 'dag_hash')
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index d29e43c..f33e67b 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -19,6 +19,7 @@
 
 """Serialized DAG table in database."""
 
+import hashlib
 import logging
 from datetime import timedelta
 from typing import Any, Optional
@@ -53,7 +54,7 @@ class SerializedDagModel(Base):
       interval of deleting serialized DAGs in DB when the files are deleted, suggest
       to use a smaller interval such as 60
 
-    It is used by webserver to load dagbags when ``store_serialized_dags=True``.
+    It is used by webserver to load dags when ``store_serialized_dags=True``.
     Because reading from database is lightweight compared to importing from files,
     it solves the webserver scalability issue.
     """
@@ -65,6 +66,7 @@ class SerializedDagModel(Base):
     fileloc_hash = Column(BigInteger, nullable=False)
     data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
     last_updated = Column(UtcDateTime, nullable=False)
+    dag_hash = Column(String(32), nullable=False)
 
     __table_args__ = (
         Index('idx_fileloc_hash', fileloc_hash, unique=False),
@@ -76,6 +78,7 @@ class SerializedDagModel(Base):
         self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
+        self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()
 
     def __repr__(self):
         return "<SerializedDag: {}>".format(self.dag_id)
@@ -105,9 +108,11 @@ class SerializedDagModel(Base):
                 return
 
         log.debug("Checking if DAG (%s) changed", dag.dag_id)
-        serialized_dag_from_db = session.query(cls).get(dag.dag_id)    # type: SerializedDagModel
         new_serialized_dag = cls(dag)
-        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+        serialized_dag_hash_from_db = session.query(
+            cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar()
+
+        if serialized_dag_hash_from_db == new_serialized_dag.dag_hash:
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
             return
 
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index 4e0e8ea..56782ed 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -84,27 +84,26 @@ class SerializedDagModelTest(unittest.TestCase):
         SDM.write_dag(dag=example_bash_op_dag)
 
         with db.create_session() as session:
-            last_updated = session.query(
-                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)
 
             # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
             # column is not updated
             SDM.write_dag(dag=example_bash_op_dag)
-            last_updated_1 = session.query(
-                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)
 
-            self.assertEqual(last_updated, last_updated_1)
+            self.assertEqual(s_dag_1.dag_hash, s_dag.dag_hash)
+            self.assertEqual(s_dag.last_updated, s_dag_1.last_updated)
 
             # Update DAG
             example_bash_op_dag.tags += ["new_tag"]
             six.assertCountEqual(self, example_bash_op_dag.tags, ["example", "new_tag"])
 
             SDM.write_dag(dag=example_bash_op_dag)
-            new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
-                SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)
 
-            self.assertNotEqual(last_updated, new_s_dag.last_updated)
-            self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
+            self.assertNotEqual(s_dag.last_updated, s_dag_2.last_updated)
+            self.assertNotEqual(s_dag.dag_hash, s_dag_2.dag_hash)
+            self.assertEqual(s_dag_2.data["dag"]["tags"], ["example", "new_tag"])
 
     def test_read_dags(self):
         """DAGs can be read from database."""