You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/03/20 22:31:09 UTC

[airflow] branch master updated: Sort lists, sets and tuples in Serialized DAGs (#14909)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4531168  Sort lists, sets and tuples in Serialized DAGs (#14909)
4531168 is described below

commit 4531168e9011984145f3048105215342dc949bef
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Mar 20 22:30:55 2021 +0000

    Sort lists, sets and tuples in Serialized DAGs (#14909)
    
    Currently we check if the dag changed or not via dag_hash.
    The problem is since the insertion order is not guaranteed, it produces
    a different hash and hence results in a DB write unncessarily.
    
    This commit fixes it.
---
 airflow/serialization/serialized_objects.py   |  6 ++---
 tests/serialization/test_dag_serialization.py | 34 ++++++++++++++++++++++++---
 2 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index d609c09..a890cd1 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -214,7 +214,7 @@ class BaseSerialization:
         elif isinstance(var, dict):
             return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT)
         elif isinstance(var, list):
-            return [cls._serialize(v) for v in var]
+            return sorted(cls._serialize(v) for v in var)
         elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod):
             json_pod = PodGenerator.serialize_pod(var)
             return cls._encode(json_pod, type_=DAT.POD)
@@ -240,10 +240,10 @@ class BaseSerialization:
             return str(get_python_source(var))
         elif isinstance(var, set):
             # FIXME: casts set to list in customized serialization in future.
-            return cls._encode([cls._serialize(v) for v in var], type_=DAT.SET)
+            return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET)
         elif isinstance(var, tuple):
             # FIXME: casts tuple to list in customized serialization in future.
-            return cls._encode([cls._serialize(v) for v in var], type_=DAT.TUPLE)
+            return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.TUPLE)
         elif isinstance(var, TaskGroup):
             return SerializedTaskGroup.serialize_task_group(var)
         else:
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index c445e19..5b17e6a 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -353,9 +353,10 @@ class TestStringifiedDAGs(unittest.TestCase):
             "_task_group",
         }
         for field in fields_to_check:
-            assert getattr(serialized_dag, field) == getattr(
-                dag, field
-            ), f'{dag.dag_id}.{field} does not match'
+            dag_field = getattr(dag, field)
+            if isinstance(dag_field, list):
+                dag_field = sorted(dag_field)
+            assert getattr(serialized_dag, field) == dag_field, f'{dag.dag_id}.{field} does not match'
 
         if dag.default_args:
             for k, v in dag.default_args.items():
@@ -1027,6 +1028,33 @@ class TestStringifiedDAGs(unittest.TestCase):
 
         assert deserialized_dag.has_on_failure_callback is expected_value
 
+    @parameterized.expand(
+        [
+            (
+                ['task_1', 'task_5', 'task_2', 'task_4'],
+                ['task_1', 'task_2', 'task_4', 'task_5'],
+            ),
+            (
+                {'task_1', 'task_5', 'task_2', 'task_4'},
+                ['task_1', 'task_2', 'task_4', 'task_5'],
+            ),
+            (
+                ('task_1', 'task_5', 'task_2', 'task_4'),
+                ['task_1', 'task_2', 'task_4', 'task_5'],
+            ),
+            (
+                {"task3": "test3", "task2": "test2", "task1": "test1"},
+                {"task1": "test1", "task2": "test2", "task3": "test3"},
+            ),
+        ]
+    )
+    def test_serialized_objects_are_sorted(self, object_to_serialized, expected_output):
+        """Test Serialized Lists, Sets and Tuples are sorted"""
+        serialized_obj = SerializedDAG._serialize(object_to_serialized)
+        if isinstance(serialized_obj, dict) and "__type" in serialized_obj:
+            serialized_obj = serialized_obj["__var"]
+        assert serialized_obj == expected_output
+
 
 def test_kubernetes_optional():
     """Serialisation / deserialisation continues to work without kubernetes installed"""