You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:32:55 UTC

[airflow] 20/47: Don't Update Serialized DAGs in DB if DAG didn't change (#9850)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b9d2aaf4f901f738f9df22628a6ed8ba4208107c
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jul 20 12:31:05 2020 +0100

    Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
    
    We should not update the "last_updated" column unnecessarily. This is first of  few optimizations to DAG Serialization that would also aid in DAG Versioning
    
    (cherry picked from commit 1a32c45126f1086a52eeee52d4c19427af06274b)
---
 airflow/models/serialized_dag.py    | 22 ++++++++++++++++++----
 tests/models/test_serialized_dag.py | 31 +++++++++++++++++++++++++++++++
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index c655e34..1313cac 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Serialzed DAG table in database."""
+"""Serialized DAG table in database."""
 
 import logging
 from datetime import timedelta
@@ -25,6 +25,7 @@ from typing import Any, Optional
 
 import sqlalchemy_jsonfield
 from sqlalchemy import BigInteger, Column, Index, String, and_
+from sqlalchemy.orm import Session  # noqa: F401
 from sqlalchemy.sql import exists
 
 from airflow.models.base import ID_LEN, Base
@@ -86,12 +87,13 @@ class SerializedDagModel(Base):
                   min_update_interval=None,   # type: Optional[int]
                   session=None):
         """Serializes a DAG and writes it into database.
+        If the record already exists, it checks if the Serialized DAG changed or not. If it is
+        changed, it updates the record, ignores otherwise.
 
         :param dag: a DAG to be written into database
         :param min_update_interval: minimal interval in seconds to update serialized DAG
         :param session: ORM Session
         """
-        log.debug("Writing DAG: %s to the DB", dag)
         # Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
         # If Yes, does nothing
         # If No or the DAG does not exists, updates / writes Serialized DAG to DB
@@ -102,8 +104,15 @@ class SerializedDagModel(Base):
             ).scalar():
                 return
 
-        log.debug("Writing DAG: %s to the DB", dag.dag_id)
-        session.merge(cls(dag))
+        log.debug("Checking if DAG (%s) changed", dag.dag_id)
+        serialized_dag_from_db = session.query(cls).get(dag.dag_id)    # type: SerializedDagModel
+        new_serialized_dag = cls(dag)
+        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+            log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
+            return
+
+        log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
+        session.merge(new_serialized_dag)
         log.debug("DAG: %s written to the DB", dag.dag_id)
 
     @classmethod
@@ -112,6 +121,7 @@ class SerializedDagModel(Base):
         """Reads all DAGs in serialized_dag table.
 
         :param session: ORM Session
+        :type session: Session
         :returns: a dict of DAGs read from database
         """
         serialized_dags = session.query(cls)
@@ -148,6 +158,7 @@ class SerializedDagModel(Base):
         :param dag_id: dag_id to be deleted
         :type dag_id: str
         :param session: ORM Session
+        :type session: Session
         """
         session.execute(cls.__table__.delete().where(cls.dag_id == dag_id))
 
@@ -159,6 +170,7 @@ class SerializedDagModel(Base):
         :param alive_dag_filelocs: file paths of alive DAGs
         :type alive_dag_filelocs: list
         :param session: ORM Session
+        :type session: Session
         """
         alive_fileloc_hashes = [
             DagCode.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
@@ -179,6 +191,7 @@ class SerializedDagModel(Base):
         :param dag_id: the DAG to check
         :type dag_id: str
         :param session: ORM Session
+        :type session: Session
         :rtype: bool
         """
         return session.query(exists().where(cls.dag_id == dag_id)).scalar()
@@ -193,6 +206,7 @@ class SerializedDagModel(Base):
 
         :param dag_id: the DAG to fetch
         :param session: ORM Session
+        :type session: Session
         """
         from airflow.models.dag import DagModel
         row = session.query(cls).filter(cls.dag_id == dag_id).one_or_none()
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index c70db78..4e0e8ea 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -19,6 +19,7 @@
 
 """Unit tests for SerializedDagModel."""
 
+import six
 import unittest
 
 from airflow import example_dags as example_dags_module
@@ -75,6 +76,36 @@ class SerializedDagModelTest(unittest.TestCase):
                 # Verifies JSON schema.
                 SerializedDAG.validate_schema(result.data)
 
+    def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
+        """Test Serialized DAG is updated if DAG is changed"""
+
+        example_dags = make_example_dags(example_dags_module)
+        example_bash_op_dag = example_dags.get("example_bash_operator")
+        SDM.write_dag(dag=example_bash_op_dag)
+
+        with db.create_session() as session:
+            last_updated = session.query(
+                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
+            # column is not updated
+            SDM.write_dag(dag=example_bash_op_dag)
+            last_updated_1 = session.query(
+                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertEqual(last_updated, last_updated_1)
+
+            # Update DAG
+            example_bash_op_dag.tags += ["new_tag"]
+            six.assertCountEqual(self, example_bash_op_dag.tags, ["example", "new_tag"])
+
+            SDM.write_dag(dag=example_bash_op_dag)
+            new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
+                SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertNotEqual(last_updated, new_s_dag.last_updated)
+            self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
+
     def test_read_dags(self):
         """DAGs can be read from database."""
         example_dags = self._write_example_dags()