You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/07 20:30:27 UTC

[GitHub] [airflow] kaxil opened a new pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

kaxil opened a new pull request #10227:
URL: https://github.com/apache/airflow/pull/10227


   closes #10116
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#discussion_r467570749



##########
File path: airflow/models/serialized_dag.py
##########
@@ -76,6 +82,7 @@ def __init__(self, dag: DAG):
         self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
+        self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()

Review comment:
       Ah Right! Makes sense indeed. Great to get some input from AirBnB folks on that one :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#discussion_r467569001



##########
File path: airflow/models/serialized_dag.py
##########
@@ -65,6 +66,11 @@ class SerializedDagModel(Base):
     fileloc_hash = Column(BigInteger, nullable=False)
     data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
     last_updated = Column(UtcDateTime, nullable=False)
+    # TODO: Make dag_hash not nullable in Airflow 1.10.13 or Airflow 2.0??

Review comment:
       Good point. I was tempted to do that, the only reason I didn't do it was to avoid any impression that it is a hash but I think that might not be a problem. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#discussion_r467566683



##########
File path: airflow/models/serialized_dag.py
##########
@@ -76,6 +82,7 @@ def __init__(self, dag: DAG):
         self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
+        self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()

Review comment:
       I believe it will be better to use  hash of the file itself rather than Json representation. Not only it will be faster (you d note have to parse the file and create a SerializedDAG, but also it will be more accurate (there are some changes like comments that might cause the Serialized DAG to be the same (comments, jinja templates etc.). Of course it will be still ok (we care about Serialized representation in this case) but I think it would be more accurate to refresh Serialized DAG and hash every time when the file changes. Also it wil be much easier to reason about that hash and verify if it is correct or whether the file changed since - we just need to check the hash of the file on disk. 

##########
File path: airflow/models/serialized_dag.py
##########
@@ -65,6 +66,11 @@ class SerializedDagModel(Base):
     fileloc_hash = Column(BigInteger, nullable=False)
     data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
     last_updated = Column(UtcDateTime, nullable=False)
+    # TODO: Make dag_hash not nullable in Airflow 1.10.13 or Airflow 2.0??

Review comment:
       We can make it not nullable and add default value in the migration ("Hash not calculated yet"). This way it will be refreshed with the next run automagically and we will not worry about nullability.

##########
File path: airflow/models/serialized_dag.py
##########
@@ -102,9 +109,11 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session:
                 return
 
         log.debug("Checking if DAG (%s) changed", dag.dag_id)
-        serialized_dag_from_db: SerializedDagModel = session.query(cls).get(dag.dag_id)
         new_serialized_dag = cls(dag)
-        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+        serialized_dag_hash_from_db = session.query(
+            cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar()
+
+        if serialized_dag_hash_from_db and (serialized_dag_hash_from_db == new_serialized_dag.dag_hash):

Review comment:
       If make the column nullable with default this might be just ==




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #10227:
URL: https://github.com/apache/airflow/pull/10227


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#discussion_r467569314



##########
File path: airflow/models/serialized_dag.py
##########
@@ -76,6 +82,7 @@ def __init__(self, dag: DAG):
         self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
+        self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()

Review comment:
       i.e. I have seen cases where people import Schedule Interval, operators and other properties from a different file. If they change the imported file, the Hash of the DAG file remains the same but Serialized DAG changes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#issuecomment-672797017


   :tada: :+1: 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#discussion_r468781710



##########
File path: airflow/models/serialized_dag.py
##########
@@ -102,9 +109,11 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session:
                 return
 
         log.debug("Checking if DAG (%s) changed", dag.dag_id)
-        serialized_dag_from_db: SerializedDagModel = session.query(cls).get(dag.dag_id)
         new_serialized_dag = cls(dag)
-        if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data):
+        serialized_dag_hash_from_db = session.query(
+            cls.dag_hash).filter(cls.dag_id == dag.dag_id).scalar()
+
+        if serialized_dag_hash_from_db and (serialized_dag_hash_from_db == new_serialized_dag.dag_hash):

Review comment:
       Updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#issuecomment-671029234


   Ping @potiuk 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #10227: Use Hash of Serialized DAG to determine DAG is changed or not

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #10227:
URL: https://github.com/apache/airflow/pull/10227#discussion_r467569173



##########
File path: airflow/models/serialized_dag.py
##########
@@ -76,6 +82,7 @@ def __init__(self, dag: DAG):
         self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
         self.data = SerializedDAG.to_dict(dag)
         self.last_updated = timezone.utcnow()
+        self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()

Review comment:
       The main reason for using Hash of a Serialized DAG is related to the question I asked Airbnb folks:
   
   ![image](https://user-images.githubusercontent.com/8811558/89730618-39257000-da38-11ea-828f-076cff27102e.png)
   
   >Question for cong zhu: Do you store Hash of the DAG File? What happens when the modules imported by the DAG File changes bu not the DAG files itself, in that case DAG file hash remains the same but DAG might not be the same
   >Cong Zhu: Yes, we store the hash of the serialized DAG
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org