You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/04/16 07:47:21 UTC
[airflow] 01/04: Bugfix: ``TypeError`` when Serializing & sorting
iterables (#15395)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0e0dc73c0b511598414f8d89e595efd09e5087ad
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Apr 16 03:39:26 2021 +0100
Bugfix: ``TypeError`` when Serializing & sorting iterables (#15395)
This bug got introduced in #14909. Removed sorting from list and tuple as list & tuples preserve order unlike set.
The following DAG errors with: `TypeError: '<' not supported between instances of 'dict' and 'dict'`
```python
from airflow import models
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
params = {
"staging_schema": [{"key:":"foo","value":"bar"},
{"key:":"this","value":"that"}]
}
with models.DAG(dag_id='test-dag',
start_date=datetime(2019, 2, 14),
schedule_interval='30 13 * * *',
catchup=False,
max_active_runs=1,
params=params
) as dag:
my_task = DummyOperator(
task_id='task1'
)
```
Full Error:
```
File "/usr/local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 210, in <dictcomp>
return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT)
File "/usr/local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 212, in _serialize
return sorted(cls._serialize(v) for v in var)
TypeError: '<' not supported between instances of 'dict' and 'dict'
During handling of the above exception, another exception occurred:
...
```
This is because `sorted()` does not work with dict as it can't compare. Removed sorting from list & tuples which fixes it.
It also fails when we have set with multiple types.
(cherry picked from commit d1150403a35c497a774a4ffbb1ca4546c532dc81)
---
airflow/serialization/serialized_objects.py | 9 ++++---
tests/serialization/test_dag_serialization.py | 36 +++++++++++++++++++++------
2 files changed, 35 insertions(+), 10 deletions(-)
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index a890cd1..b6cfdf2 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -214,7 +214,7 @@ class BaseSerialization:
elif isinstance(var, dict):
return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT)
elif isinstance(var, list):
- return sorted(cls._serialize(v) for v in var)
+ return [cls._serialize(v) for v in var]
elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod):
json_pod = PodGenerator.serialize_pod(var)
return cls._encode(json_pod, type_=DAT.POD)
@@ -240,10 +240,13 @@ class BaseSerialization:
return str(get_python_source(var))
elif isinstance(var, set):
# FIXME: casts set to list in customized serialization in future.
- return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET)
+ try:
+ return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET)
+ except TypeError:
+ return cls._encode([cls._serialize(v) for v in var], type_=DAT.SET)
elif isinstance(var, tuple):
# FIXME: casts tuple to list in customized serialization in future.
- return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.TUPLE)
+ return cls._encode([cls._serialize(v) for v in var], type_=DAT.TUPLE)
elif isinstance(var, TaskGroup):
return SerializedTaskGroup.serialize_task_group(var)
else:
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index e447751..895f2cf 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -357,10 +357,9 @@ class TestStringifiedDAGs(unittest.TestCase):
"_task_group",
}
for field in fields_to_check:
- dag_field = getattr(dag, field)
- if isinstance(dag_field, list):
- dag_field = sorted(dag_field)
- assert getattr(serialized_dag, field) == dag_field, f'{dag.dag_id}.{field} does not match'
+ assert getattr(serialized_dag, field) == getattr(
+ dag, field
+ ), f'{dag.dag_id}.{field} does not match'
if dag.default_args:
for k, v in dag.default_args.items():
@@ -1041,7 +1040,7 @@ class TestStringifiedDAGs(unittest.TestCase):
[
(
['task_1', 'task_5', 'task_2', 'task_4'],
- ['task_1', 'task_2', 'task_4', 'task_5'],
+ ['task_1', 'task_5', 'task_2', 'task_4'],
),
(
{'task_1', 'task_5', 'task_2', 'task_4'},
@@ -1049,16 +1048,39 @@ class TestStringifiedDAGs(unittest.TestCase):
),
(
('task_1', 'task_5', 'task_2', 'task_4'),
- ['task_1', 'task_2', 'task_4', 'task_5'],
+ ['task_1', 'task_5', 'task_2', 'task_4'],
+ ),
+ (
+ {
+ "staging_schema": [
+ {"key:": "foo", "value": "bar"},
+ {"key:": "this", "value": "that"},
+ "test_conf",
+ ]
+ },
+ {
+ "staging_schema": [
+ {"__type": "dict", "__var": {"key:": "foo", "value": "bar"}},
+ {
+ "__type": "dict",
+ "__var": {"key:": "this", "value": "that"},
+ },
+ "test_conf",
+ ]
+ },
),
(
{"task3": "test3", "task2": "test2", "task1": "test1"},
{"task1": "test1", "task2": "test2", "task3": "test3"},
),
+ (
+ ('task_1', 'task_5', 'task_2', 3, ["x", "y"]),
+ ['task_1', 'task_5', 'task_2', 3, ["x", "y"]],
+ ),
]
)
def test_serialized_objects_are_sorted(self, object_to_serialized, expected_output):
- """Test Serialized Lists, Sets and Tuples are sorted"""
+ """Test Serialized Sets are sorted while list and tuple preserve order"""
serialized_obj = SerializedDAG._serialize(object_to_serialized)
if isinstance(serialized_obj, dict) and "__type" in serialized_obj:
serialized_obj = serialized_obj["__var"]