You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/03/20 22:37:49 UTC

[airflow] branch v2-0-test updated (2e76ed1 -> 94e016d)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 2e76ed1  Simplify cleaning string passed to origin param (#14738) (#14905)
     new 6df8184  Sort lists, sets and tuples in Serialized DAGs (#14909)
     new 94e016d  Multiple minor doc fixes (#14917)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/api_connexion/openapi/v1.yaml         |  2 +-
 airflow/serialization/serialized_objects.py   |  6 ++---
 docs/apache-airflow/backport-providers.rst    |  2 +-
 docs/apache-airflow/dag-serialization.rst     |  2 +-
 docs/apache-airflow/macros-ref.rst            |  2 +-
 tests/serialization/test_dag_serialization.py | 34 ++++++++++++++++++++++++---
 6 files changed, 38 insertions(+), 10 deletions(-)

[airflow] 01/02: Sort lists, sets and tuples in Serialized DAGs (#14909)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6df81846035b2aef200a266df83e297962fe0eef
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Mar 20 22:30:55 2021 +0000

    Sort lists, sets and tuples in Serialized DAGs (#14909)
    
    Currently we check if the dag changed or not via dag_hash.
    The problem is since the insertion order is not guaranteed, it produces
    a different hash and hence results in a DB write unncessarily.
    
    This commit fixes it.
    
    (cherry picked from commit 4531168e9011984145f3048105215342dc949bef)
---
 airflow/serialization/serialized_objects.py   |  6 ++---
 tests/serialization/test_dag_serialization.py | 34 ++++++++++++++++++++++++---
 2 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index d609c09..a890cd1 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -214,7 +214,7 @@ class BaseSerialization:
         elif isinstance(var, dict):
             return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT)
         elif isinstance(var, list):
-            return [cls._serialize(v) for v in var]
+            return sorted(cls._serialize(v) for v in var)
         elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod):
             json_pod = PodGenerator.serialize_pod(var)
             return cls._encode(json_pod, type_=DAT.POD)
@@ -240,10 +240,10 @@ class BaseSerialization:
             return str(get_python_source(var))
         elif isinstance(var, set):
             # FIXME: casts set to list in customized serialization in future.
-            return cls._encode([cls._serialize(v) for v in var], type_=DAT.SET)
+            return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET)
         elif isinstance(var, tuple):
             # FIXME: casts tuple to list in customized serialization in future.
-            return cls._encode([cls._serialize(v) for v in var], type_=DAT.TUPLE)
+            return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.TUPLE)
         elif isinstance(var, TaskGroup):
             return SerializedTaskGroup.serialize_task_group(var)
         else:
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index a775f5b..55d2c5a 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -353,9 +353,10 @@ class TestStringifiedDAGs(unittest.TestCase):
             "_task_group",
         }
         for field in fields_to_check:
-            assert getattr(serialized_dag, field) == getattr(
-                dag, field
-            ), f'{dag.dag_id}.{field} does not match'
+            dag_field = getattr(dag, field)
+            if isinstance(dag_field, list):
+                dag_field = sorted(dag_field)
+            assert getattr(serialized_dag, field) == dag_field, f'{dag.dag_id}.{field} does not match'
 
         if dag.default_args:
             for k, v in dag.default_args.items():
@@ -1027,6 +1028,33 @@ class TestStringifiedDAGs(unittest.TestCase):
 
         assert deserialized_dag.has_on_failure_callback is expected_value
 
+    @parameterized.expand(
+        [
+            (
+                ['task_1', 'task_5', 'task_2', 'task_4'],
+                ['task_1', 'task_2', 'task_4', 'task_5'],
+            ),
+            (
+                {'task_1', 'task_5', 'task_2', 'task_4'},
+                ['task_1', 'task_2', 'task_4', 'task_5'],
+            ),
+            (
+                ('task_1', 'task_5', 'task_2', 'task_4'),
+                ['task_1', 'task_2', 'task_4', 'task_5'],
+            ),
+            (
+                {"task3": "test3", "task2": "test2", "task1": "test1"},
+                {"task1": "test1", "task2": "test2", "task3": "test3"},
+            ),
+        ]
+    )
+    def test_serialized_objects_are_sorted(self, object_to_serialized, expected_output):
+        """Test Serialized Lists, Sets and Tuples are sorted"""
+        serialized_obj = SerializedDAG._serialize(object_to_serialized)
+        if isinstance(serialized_obj, dict) and "__type" in serialized_obj:
+            serialized_obj = serialized_obj["__var"]
+        assert serialized_obj == expected_output
+
 
 def test_kubernetes_optional():
     """Serialisation / deserialisation continues to work without kubernetes installed"""

[airflow] 02/02: Multiple minor doc fixes (#14917)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 94e016dc9b934facac32c87e265a928c5cc26f28
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Sat Mar 20 23:33:54 2021 +0100

    Multiple minor doc fixes (#14917)
    
    
    (cherry picked from commit ed872a64f1d9e57fdd1e05f82a1f490d9e9e7fd1)
---
 airflow/api_connexion/openapi/v1.yaml      | 2 +-
 docs/apache-airflow/backport-providers.rst | 2 +-
 docs/apache-airflow/dag-serialization.rst  | 2 +-
 docs/apache-airflow/macros-ref.rst         | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 83dae6a..2a9f7cc 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1445,7 +1445,7 @@ components:
           type: string
           readOnly: true
           nullable: true
-          description: If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, nulll.
+          description: If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, null.
         is_paused:
           type: boolean
           nullable: true
diff --git a/docs/apache-airflow/backport-providers.rst b/docs/apache-airflow/backport-providers.rst
index 016b30f..328dd30 100644
--- a/docs/apache-airflow/backport-providers.rst
+++ b/docs/apache-airflow/backport-providers.rst
@@ -87,7 +87,7 @@ Backport providers only work when they are installed in the same namespace as th
 This is majority of cases when you simply run pip install - it installs all packages in the same folder
 (usually in ``/usr/local/lib/pythonX.Y/site-packages``). But when you install the ``apache-airflow`` and
 ``apache-airflow-backport-package-*`` using different methods (for example using ``pip install -e .`` or
-``pip install --user`` they might be installed in different namespaces.
+``pip install --user``) they might be installed in different namespaces.
 
 If that's the case, the provider packages will not be importable (the error in such case is
 ``ModuleNotFoundError: No module named 'airflow.providers'``).
diff --git a/docs/apache-airflow/dag-serialization.rst b/docs/apache-airflow/dag-serialization.rst
index c883f8b..cf5b7a0 100644
--- a/docs/apache-airflow/dag-serialization.rst
+++ b/docs/apache-airflow/dag-serialization.rst
@@ -41,7 +41,7 @@ as :class:`~airflow.models.serialized_dag.SerializedDagModel` model.
 The Webserver now instead of having to parse the DAG file again, reads the
 serialized DAGs in JSON, de-serializes them and create the DagBag and uses it
 to show in the UI. And the Scheduler does not need the actual DAG for making Scheduling decisions,
-instead of using the DAG files, we use Serialized DAGs that contain all the information needing to
+instead of using the DAG files, we use Serialized DAGs that contain all the information needed to
 schedule the DAGs from Airflow 2.0.0 (this was done as part of :ref:`Scheduler HA <scheduler:ha>`).
 
 One of the key features that is implemented as the part of DAG Serialization is that
diff --git a/docs/apache-airflow/macros-ref.rst b/docs/apache-airflow/macros-ref.rst
index 6ef12c7..832dad7 100644
--- a/docs/apache-airflow/macros-ref.rst
+++ b/docs/apache-airflow/macros-ref.rst
@@ -62,7 +62,7 @@ Variable                                Description
 ``{{ ti }}``                            same as ``{{ task_instance }}``
 ``{{ params }}``                        a reference to the user-defined params dictionary which can be overridden by
                                         the dictionary passed through ``trigger_dag -c`` if you enabled
-                                        ``dag_run_conf_overrides_params` in ``airflow.cfg``
+                                        ``dag_run_conf_overrides_params`` in ``airflow.cfg``
 ``{{ var.value.my_var }}``              global defined variables represented as a dictionary
 ``{{ var.json.my_var.path }}``          global defined variables represented as a dictionary
                                         with deserialized JSON object, append the path to the