You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:32:55 UTC
[airflow] 20/47: Don't Update Serialized DAGs in DB if DAG didn't
change (#9850)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b9d2aaf4f901f738f9df22628a6ed8ba4208107c
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jul 20 12:31:05 2020 +0100
Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
We should not update the "last_updated" column unnecessarily. This is first of few optimizations to DAG Serialization that would also aid in DAG Versioning
(cherry picked from commit 1a32c45126f1086a52eeee52d4c19427af06274b)
---
airflow/models/serialized_dag.py | 22 ++++++++++++++++++----
tests/models/test_serialized_dag.py | 31 +++++++++++++++++++++++++++++++
2 files changed, 49 insertions(+), 4 deletions(-)
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index c655e34..1313cac 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
-"""Serialzed DAG table in database."""
+"""Serialized DAG table in database."""
import logging
from datetime import timedelta
@@ -25,6 +25,7 @@ from typing import Any, Optional
import sqlalchemy_jsonfield
from sqlalchemy import BigInteger, Column, Index, String, and_
+from sqlalchemy.orm import Session # noqa: F401
from sqlalchemy.sql import exists
from airflow.models.base import ID_LEN, Base
@@ -86,12 +87,13 @@ class SerializedDagModel(Base):
min_update_interval=None, # type: Optional[int]
session=None):
"""Serializes a DAG and writes it into database.
+ If the record already exists, it checks if the Serialized DAG changed or not. If it is
+ changed, it updates the record, ignores otherwise.
:param dag: a DAG to be written into database
:param min_update_interval: minimal interval in seconds to update serialized DAG
:param session: ORM Session
"""
- log.debug("Writing DAG: %s to the DB", dag)
# Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval
# If Yes, does nothing
# If No or the DAG does not exists, updates / writes Serialized DAG to DB
@@ -102,8 +104,15 @@ class SerializedDagModel(Base):
).scalar():
return
- log.debug("Writing DAG: %s to the DB", dag.dag_id)
- session.merge(cls(dag))
+ log.debug("Checking if DAG (%s) changed", dag.dag_id)
+ serialized_dag_from_db = session.query(cls).get(dag.dag_id) # type: SerializedDagModel
+ new_serialized_dag = cls(dag)
+ if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+ log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
+ return
+
+ log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
+ session.merge(new_serialized_dag)
log.debug("DAG: %s written to the DB", dag.dag_id)
@classmethod
@@ -112,6 +121,7 @@ class SerializedDagModel(Base):
"""Reads all DAGs in serialized_dag table.
:param session: ORM Session
+ :type session: Session
:returns: a dict of DAGs read from database
"""
serialized_dags = session.query(cls)
@@ -148,6 +158,7 @@ class SerializedDagModel(Base):
:param dag_id: dag_id to be deleted
:type dag_id: str
:param session: ORM Session
+ :type session: Session
"""
session.execute(cls.__table__.delete().where(cls.dag_id == dag_id))
@@ -159,6 +170,7 @@ class SerializedDagModel(Base):
:param alive_dag_filelocs: file paths of alive DAGs
:type alive_dag_filelocs: list
:param session: ORM Session
+ :type session: Session
"""
alive_fileloc_hashes = [
DagCode.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
@@ -179,6 +191,7 @@ class SerializedDagModel(Base):
:param dag_id: the DAG to check
:type dag_id: str
:param session: ORM Session
+ :type session: Session
:rtype: bool
"""
return session.query(exists().where(cls.dag_id == dag_id)).scalar()
@@ -193,6 +206,7 @@ class SerializedDagModel(Base):
:param dag_id: the DAG to fetch
:param session: ORM Session
+ :type session: Session
"""
from airflow.models.dag import DagModel
row = session.query(cls).filter(cls.dag_id == dag_id).one_or_none()
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index c70db78..4e0e8ea 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -19,6 +19,7 @@
"""Unit tests for SerializedDagModel."""
+import six
import unittest
from airflow import example_dags as example_dags_module
@@ -75,6 +76,36 @@ class SerializedDagModelTest(unittest.TestCase):
# Verifies JSON schema.
SerializedDAG.validate_schema(result.data)
+ def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
+ """Test Serialized DAG is updated if DAG is changed"""
+
+ example_dags = make_example_dags(example_dags_module)
+ example_bash_op_dag = example_dags.get("example_bash_operator")
+ SDM.write_dag(dag=example_bash_op_dag)
+
+ with db.create_session() as session:
+ last_updated = session.query(
+ SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+ # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
+ # column is not updated
+ SDM.write_dag(dag=example_bash_op_dag)
+ last_updated_1 = session.query(
+ SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+ self.assertEqual(last_updated, last_updated_1)
+
+ # Update DAG
+ example_bash_op_dag.tags += ["new_tag"]
+ six.assertCountEqual(self, example_bash_op_dag.tags, ["example", "new_tag"])
+
+ SDM.write_dag(dag=example_bash_op_dag)
+ new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
+ SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+ self.assertNotEqual(last_updated, new_s_dag.last_updated)
+ self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
+
def test_read_dags(self):
"""DAGs can be read from database."""
example_dags = self._write_example_dags()