You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/03 01:02:47 UTC

[GitHub] [airflow] luoyuliuyin opened a new pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

luoyuliuyin opened a new pull request #16734:
URL: https://github.com/apache/airflow/pull/16734


   We check if the dag changed or not via dag_hash, so we need to correctly handle deps and task_group during DAG serialization to ensure that the generation of dag_hash is stable.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-873394174


   > Re-triggering CI
   
   Is the CI service  still service not working?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on a change in pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on a change in pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#discussion_r662276286



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -978,6 +978,107 @@ def check_task_group(node):
 
         check_task_group(serialized_dag.task_group)
 
+    def test_deps_sorted(self):
+        """
+        Tests serialize_operator, make sure the deps is in order
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test_deps_sorted", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = DummyOperator(task_id="task2")
+            task1 >> task2
+
+        serialize_op = SerializedBaseOperator.serialize_operator(dag.task_dict["task1"])
+        deps = serialize_op["deps"]
+        assert len(deps) == 5
+        assert deps[0] == 'airflow.ti_deps.deps.not_in_retry_period_dep.NotInRetryPeriodDep'
+        assert deps[1] == 'airflow.ti_deps.deps.not_previously_skipped_dep.NotPreviouslySkippedDep'
+        assert deps[2] == 'airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep'
+        assert deps[3] == 'airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep'
+        assert deps[4] == 'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep'

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #16734:
URL: https://github.com/apache/airflow/pull/16734


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-874727085


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-874727176


   Good work @luoyuliuyin πŸ‘ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-874390928


   > Re-triggering CI
   
   static checks have passed, and i have updated the PR description and title


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on a change in pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on a change in pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#discussion_r662276029



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -808,10 +808,10 @@ def serialize_task_group(cls, task_group: TaskGroup) -> Optional[Union[Dict[str,
                 else (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
                 for label, child in task_group.children.items()
             },
-            "upstream_group_ids": cls._serialize(list(task_group.upstream_group_ids)),
-            "downstream_group_ids": cls._serialize(list(task_group.downstream_group_ids)),
-            "upstream_task_ids": cls._serialize(list(task_group.upstream_task_ids)),
-            "downstream_task_ids": cls._serialize(list(task_group.downstream_task_ids)),
+            "upstream_group_ids": cls._serialize(sorted(list(task_group.upstream_group_ids))),

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-874726623


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-873400119


   > > Re-triggering CI
   > 
   > Is the CI service still service not working?
   
   No. The GitHub Packages stability has been addressed yesterday (we switched to a ghcr.io which is much more stable). You need to fix static checks - you have badly formatted files. I recommend installing pre-commit locally (see CONTRIBUTING.rst) then pre-commit will fix it for you automatically when you do `git commit --amend`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-874390928


   > Re-triggering CI
   
   static checks have passed, and i have updated the PR description and title


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-873320008


   Re-triggering CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-871543297


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-871664594


   And like Ash explains in https://github.com/apache/airflow/pull/15395#discussion_r614418669 , list and tuple's should not be sorted.
   
   If there is an issue with how we are storing `deps` we should convert it to a set


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-872165612


   > Which version did you try this with, we had already fixed some of it here: #15395
   
   The version I am using is 2.1.0, and this problem still exists now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil closed pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
kaxil closed pull request #16734:
URL: https://github.com/apache/airflow/pull/16734


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#discussion_r661630220



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -978,6 +978,107 @@ def check_task_group(node):
 
         check_task_group(serialized_dag.task_group)
 
+    def test_deps_sorted(self):
+        """
+        Tests serialize_operator, make sure the deps is in order
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test_deps_sorted", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = DummyOperator(task_id="task2")
+            task1 >> task2
+
+        serialize_op = SerializedBaseOperator.serialize_operator(dag.task_dict["task1"])
+        deps = serialize_op["deps"]
+        assert len(deps) == 5
+        assert deps[0] == 'airflow.ti_deps.deps.not_in_retry_period_dep.NotInRetryPeriodDep'
+        assert deps[1] == 'airflow.ti_deps.deps.not_previously_skipped_dep.NotPreviouslySkippedDep'
+        assert deps[2] == 'airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep'
+        assert deps[3] == 'airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep'
+        assert deps[4] == 'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep'

Review comment:
       ```suggestion
           assert deps == [
               'airflow.ti_deps.deps.not_in_retry_period_dep.NotInRetryPeriodDep',
               'airflow.ti_deps.deps.not_previously_skipped_dep.NotPreviouslySkippedDep',
               'airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep',
               'airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep',
               'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
           ]
   ```
   
   Same for similar assert statements below.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -808,10 +808,10 @@ def serialize_task_group(cls, task_group: TaskGroup) -> Optional[Union[Dict[str,
                 else (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
                 for label, child in task_group.children.items()
             },
-            "upstream_group_ids": cls._serialize(list(task_group.upstream_group_ids)),
-            "downstream_group_ids": cls._serialize(list(task_group.downstream_group_ids)),
-            "upstream_task_ids": cls._serialize(list(task_group.upstream_task_ids)),
-            "downstream_task_ids": cls._serialize(list(task_group.downstream_task_ids)),
+            "upstream_group_ids": cls._serialize(sorted(list(task_group.upstream_group_ids))),

Review comment:
       ```suggestion
               "upstream_group_ids": cls._serialize(sorted(task_group.upstream_group_ids)),
   ```
   
   Same for others below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-871664594


   And like Ash explains in https://github.com/apache/airflow/pull/15395#discussion_r614418669 , list and tuple's should be sorted.
   
   If there is an issue with how we are storing `deps` we should convert it to a set


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-872206401


   > And like Ash explains in [#15395 (comment)](https://github.com/apache/airflow/pull/15395#discussion_r614418669) , list and tuple's should be sorted.
   > 
   > If there is an issue with how we are storing `deps` we should convert it to a set
   
   I agree with ashb, `[2, 0, 3]` and` [0, 2, 3]` are not the same.
   
   We now need to serialize dag to json, and then calculate the hash of json_str to determine whether the dag has changed. But json doesn't directly work with sets because the data stored in the set is not stored as properties, we usually convert set to list to solve this problem. So in the current serialization scenario, it is reasonable to convert set to list.
   
   For the case of `sorted(list)` throwing an exception, we need to ensure that the elements in the list are of the Iterable type and that the elements are comparable. In the current modification, both deps and task_group.xxx_ids are str, so they can be compared. At the same time, we are not sure whether the return of `cls._serialize(v)` is comparable, nor whether the semantics will be changed after sorting, so we should not sort in the external function, and we should give the choice to the specific execution function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#issuecomment-872247210


   > Logic looks good to me, some coding style comments inline.
   > 
   > Could you also add comments to where `sorted` is called to describe the reason?
   
   fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16734: fix: dag_hash generation is unstable #16690

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#discussion_r662406523



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -424,7 +424,10 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
                     )
 
                 deps.append(f'{module_name}.{klass.__name__}')
-            serialize_op['deps'] = deps
+            # deps needs to be sorted here, because op.deps is a set, which is unstable when traversing,
+            # and the same call may get different results.
+            # When calling json.dumps(self.data, sort_keys=True) to generate dag_hash, misjudgment will occur
+            serialize_op['deps'] = sorted(deps)

Review comment:
       Are you using Custom deps? Can you please update the PR description and title to better reflect the changes now, something along the lines of, Handle Custom deps correct in DAG Serialization or something like that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] luoyuliuyin commented on a change in pull request #16734: fix: Correctly handle custom deps and task_group during DAG serialization #16690

Posted by GitBox <gi...@apache.org>.
luoyuliuyin commented on a change in pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#discussion_r662730291



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -424,7 +424,10 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
                     )
 
                 deps.append(f'{module_name}.{klass.__name__}')
-            serialize_op['deps'] = deps
+            # deps needs to be sorted here, because op.deps is a set, which is unstable when traversing,
+            # and the same call may get different results.
+            # When calling json.dumps(self.data, sort_keys=True) to generate dag_hash, misjudgment will occur
+            serialize_op['deps'] = sorted(deps)

Review comment:
       Thank you for your comments, I have updated the PR description and title.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org