You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/30 16:24:12 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #16734: fix: dag_hash generation is unstable #16690

uranusjr commented on a change in pull request #16734:
URL: https://github.com/apache/airflow/pull/16734#discussion_r661630220



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -978,6 +978,107 @@ def check_task_group(node):
 
         check_task_group(serialized_dag.task_group)
 
+    def test_deps_sorted(self):
+        """
+        Tests serialize_operator, make sure the deps is in order
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test_deps_sorted", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = DummyOperator(task_id="task2")
+            task1 >> task2
+
+        serialize_op = SerializedBaseOperator.serialize_operator(dag.task_dict["task1"])
+        deps = serialize_op["deps"]
+        assert len(deps) == 5
+        assert deps[0] == 'airflow.ti_deps.deps.not_in_retry_period_dep.NotInRetryPeriodDep'
+        assert deps[1] == 'airflow.ti_deps.deps.not_previously_skipped_dep.NotPreviouslySkippedDep'
+        assert deps[2] == 'airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep'
+        assert deps[3] == 'airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep'
+        assert deps[4] == 'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep'

Review comment:
       ```suggestion
           assert deps == [
               'airflow.ti_deps.deps.not_in_retry_period_dep.NotInRetryPeriodDep',
               'airflow.ti_deps.deps.not_previously_skipped_dep.NotPreviouslySkippedDep',
               'airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep',
               'airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep',
               'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
           ]
   ```
   
   Same for similar assert statements below.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -808,10 +808,10 @@ def serialize_task_group(cls, task_group: TaskGroup) -> Optional[Union[Dict[str,
                 else (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
                 for label, child in task_group.children.items()
             },
-            "upstream_group_ids": cls._serialize(list(task_group.upstream_group_ids)),
-            "downstream_group_ids": cls._serialize(list(task_group.downstream_group_ids)),
-            "upstream_task_ids": cls._serialize(list(task_group.upstream_task_ids)),
-            "downstream_task_ids": cls._serialize(list(task_group.downstream_task_ids)),
+            "upstream_group_ids": cls._serialize(sorted(list(task_group.upstream_group_ids))),

Review comment:
       ```suggestion
               "upstream_group_ids": cls._serialize(sorted(task_group.upstream_group_ids)),
   ```
   
   Same for others below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org