You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/13 23:42:18 UTC

[airflow] 03/03: Use Hash of Serialized DAG to determine DAG is changed or not (#10227)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ab5dd2442f44bc94cbcbcf2c1ed52ff80cd4e41e
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Aug 11 22:31:55 2020 +0100

    Use Hash of Serialized DAG to determine DAG is changed or not (#10227)
    
    closes #10116
    
    (cherry picked from commit adce6f029609e89f3651a89df40700589ec16237)
---
 ...c3a5a_add_dag_hash_column_to_serialized_dag_.py | 46 ++++++++++++++++++++++
 airflow/models/serialized_dag.py                   | 11 ++++--
 tests/models/test_serialized_dag.py                | 17 ++++----
 3 files changed, 62 insertions(+), 12 deletions(-)

diff --git a/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py
new file mode 100644
index 0000000..4acda3b
--- /dev/null
+++ b/airflow/migrations/versions/da3f683c3a5a_add_dag_hash_column_to_serialized_dag_.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add dag_hash Column to serialized_dag table
+
+Revision ID: da3f683c3a5a
+Revises: 8d48763f6d53
+Create Date: 2020-08-07 20:52:09.178296
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'da3f683c3a5a'
+down_revision = 'a66efa278eea'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add dag_hash Column to serialized_dag table"""
+    op.add_column(
+        'serialized_dag',
+        sa.Column('dag_hash', sa.String(32), nullable=False, server_default='Hash not calculated yet'))
+
+
+def downgrade():
+    """Unapply Add dag_hash Column to serialized_dag table"""
+    op.drop_column('serialized_dag', 'dag_hash')
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index d29e43c..f33e67b 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -19,6 +19,7 @@
 
 """Serialized DAG table in database."""
 
+import hashlib
 import logging
 from datetime import timedelta
 from typing import Any, Optional
@@ -53,7 +54,7 @@ class SerializedDagModel(Base):
       interval of deleting serialized DAGs in DB when the files are deleted, suggest
       to use a smaller interval such as 60
 
-    It is used by webserver to load dagbags when ``store_serialized_dags=True``.
+    It is used by webserver to load dags when ``store_serialized_dags=True``.
     Because reading from database is lightweight compared to importing from files,
     it solves the webserver scalability issue.
     """
@@ -65,6 +66,7 @@ class SerializedDagModel(Base):
     fileloc_hash = Column(BigInteger, nullable=False)
     data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
     last_updated = Column(UtcDateTime, nullable=False)
+    dag_hash = Column(String(32), nullable=False)
 
     __table_args__ = (
         Index('idx_fileloc_hash', fileloc_hash, unique=False),
@@ -76,6 +78,7 @@ class SerializedDagModel(Base):
         self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
+        self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()
 
     def __repr__(self):
         return "<SerializedDag: {}>".format(self.dag_id)
@@ -105,9 +108,11 @@ class SerializedDagModel(Base):
                 return
 
         log.debug("Checking if DAG (%s) changed", dag.dag_id)
-        serialized_dag_from_db = session.query(cls).get(dag.dag_id)    # type: SerializedDagModel
         new_serialized_dag = cls(dag)
-        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+        serialized_dag_hash_from_db = session.query(
+            cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar()
+
+        if serialized_dag_hash_from_db == new_serialized_dag.dag_hash:
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
             return
 
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index 4e0e8ea..56782ed 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -84,27 +84,26 @@ class SerializedDagModelTest(unittest.TestCase):
         SDM.write_dag(dag=example_bash_op_dag)
 
         with db.create_session() as session:
-            last_updated = session.query(
-                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)
 
             # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
             # column is not updated
             SDM.write_dag(dag=example_bash_op_dag)
-            last_updated_1 = session.query(
-                SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)
 
-            self.assertEqual(last_updated, last_updated_1)
+            self.assertEqual(s_dag_1.dag_hash, s_dag.dag_hash)
+            self.assertEqual(s_dag.last_updated, s_dag_1.last_updated)
 
             # Update DAG
             example_bash_op_dag.tags += ["new_tag"]
             six.assertCountEqual(self, example_bash_op_dag.tags, ["example", "new_tag"])
 
             SDM.write_dag(dag=example_bash_op_dag)
-            new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
-                SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+            s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)
 
-            self.assertNotEqual(last_updated, new_s_dag.last_updated)
-            self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"])
+            self.assertNotEqual(s_dag.last_updated, s_dag_2.last_updated)
+            self.assertNotEqual(s_dag.dag_hash, s_dag_2.dag_hash)
+            self.assertEqual(s_dag_2.data["dag"]["tags"], ["example", "new_tag"])
 
     def test_read_dags(self):
         """DAGs can be read from database."""