You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/16 02:39:35 UTC

[airflow] branch master updated: Bugfix: ``TypeError`` when Serializing & sorting iterables (#15395)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new d115040  Bugfix: ``TypeError`` when Serializing & sorting iterables (#15395)
d115040 is described below

commit d1150403a35c497a774a4ffbb1ca4546c532dc81
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Apr 16 03:39:26 2021 +0100

    Bugfix: ``TypeError`` when Serializing & sorting iterables (#15395)
    
    This bug got introduced in #14909. Removed sorting from list and tuple as list & tuples preserve order unlike set.
    
    The following DAG errors with: `TypeError: '<' not supported between instances of 'dict' and 'dict'`
    
    ```python
    from airflow import models
    from airflow.operators.dummy import DummyOperator
    from datetime import datetime, timedelta
    params = {
        "staging_schema": [{"key:":"foo","value":"bar"},
                           {"key:":"this","value":"that"}]
    }
    
    with models.DAG(dag_id='test-dag',
                    start_date=datetime(2019, 2, 14),
                    schedule_interval='30 13 * * *',
                    catchup=False,
                    max_active_runs=1,
                    params=params
                    ) as dag:
        my_task = DummyOperator(
            task_id='task1'
        )
    ```
    
    Full Error:
    
    ```
      File "/usr/local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 210, in <dictcomp>
        return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT)
      File "/usr/local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 212, in _serialize
        return sorted(cls._serialize(v) for v in var)
    TypeError: '<' not supported between instances of 'dict' and 'dict'
    During handling of the above exception, another exception occurred:
    ...
    ```
    
    This is because `sorted()` does not work with dict as it can't compare. Removed sorting from list & tuples which fixes it.
    It also fails when we have set with multiple types.
---
 airflow/serialization/serialized_objects.py   |  9 ++++---
 tests/serialization/test_dag_serialization.py | 36 +++++++++++++++++++++------
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 25eb1fe..8a6fdc8 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -214,7 +214,7 @@ class BaseSerialization:
         elif isinstance(var, dict):
             return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT)
         elif isinstance(var, list):
-            return sorted(cls._serialize(v) for v in var)
+            return [cls._serialize(v) for v in var]
         elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod):
             json_pod = PodGenerator.serialize_pod(var)
             return cls._encode(json_pod, type_=DAT.POD)
@@ -240,10 +240,13 @@ class BaseSerialization:
             return str(get_python_source(var))
         elif isinstance(var, set):
             # FIXME: casts set to list in customized serialization in future.
-            return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET)
+            try:
+                return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET)
+            except TypeError:
+                return cls._encode([cls._serialize(v) for v in var], type_=DAT.SET)
         elif isinstance(var, tuple):
             # FIXME: casts tuple to list in customized serialization in future.
-            return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.TUPLE)
+            return cls._encode([cls._serialize(v) for v in var], type_=DAT.TUPLE)
         elif isinstance(var, TaskGroup):
             return SerializedTaskGroup.serialize_task_group(var)
         else:
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 830388d..6a186c5 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -357,10 +357,9 @@ class TestStringifiedDAGs(unittest.TestCase):
             "_task_group",
         }
         for field in fields_to_check:
-            dag_field = getattr(dag, field)
-            if isinstance(dag_field, list):
-                dag_field = sorted(dag_field)
-            assert getattr(serialized_dag, field) == dag_field, f'{dag.dag_id}.{field} does not match'
+            assert getattr(serialized_dag, field) == getattr(
+                dag, field
+            ), f'{dag.dag_id}.{field} does not match'
 
         if dag.default_args:
             for k, v in dag.default_args.items():
@@ -1062,7 +1061,7 @@ class TestStringifiedDAGs(unittest.TestCase):
         [
             (
                 ['task_1', 'task_5', 'task_2', 'task_4'],
-                ['task_1', 'task_2', 'task_4', 'task_5'],
+                ['task_1', 'task_5', 'task_2', 'task_4'],
             ),
             (
                 {'task_1', 'task_5', 'task_2', 'task_4'},
@@ -1070,16 +1069,39 @@ class TestStringifiedDAGs(unittest.TestCase):
             ),
             (
                 ('task_1', 'task_5', 'task_2', 'task_4'),
-                ['task_1', 'task_2', 'task_4', 'task_5'],
+                ['task_1', 'task_5', 'task_2', 'task_4'],
+            ),
+            (
+                {
+                    "staging_schema": [
+                        {"key:": "foo", "value": "bar"},
+                        {"key:": "this", "value": "that"},
+                        "test_conf",
+                    ]
+                },
+                {
+                    "staging_schema": [
+                        {"__type": "dict", "__var": {"key:": "foo", "value": "bar"}},
+                        {
+                            "__type": "dict",
+                            "__var": {"key:": "this", "value": "that"},
+                        },
+                        "test_conf",
+                    ]
+                },
             ),
             (
                 {"task3": "test3", "task2": "test2", "task1": "test1"},
                 {"task1": "test1", "task2": "test2", "task3": "test3"},
             ),
+            (
+                ('task_1', 'task_5', 'task_2', 3, ["x", "y"]),
+                ['task_1', 'task_5', 'task_2', 3, ["x", "y"]],
+            ),
         ]
     )
     def test_serialized_objects_are_sorted(self, object_to_serialized, expected_output):
-        """Test Serialized Lists, Sets and Tuples are sorted"""
+        """Test Serialized Sets are sorted while list and tuple preserve order"""
         serialized_obj = SerializedDAG._serialize(object_to_serialized)
         if isinstance(serialized_obj, dict) and "__type" in serialized_obj:
             serialized_obj = serialized_obj["__var"]