You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/24 13:45:15 UTC

[GitHub] [airflow] Jorricks opened a new pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Jorricks opened a new pull request #16633:
URL: https://github.com/apache/airflow/pull/16633


   Currently custom classes implementing BaseOperators get_serialized_fields() can be primitive types.
   If you try to add a complex type, it will fail on the _deserialize() step with a KeyError.
   However, the current serialisation is amazing and supports many complex types.
   This PR makes these complex types available to extra fields used by custom classes extending upon the BaseOperator or the DAG classes.
   
   Example use-case:
   A plugin loads the DagBag and wants to access the custom operators fields.
   Currently, we would still need to fully load it (without serialisation) to access these fields.
   This PR would improve the webserver performance a lot for views that require this.
   
   Note: This PR was copied [from #16554 but something went wrong during rebasing](https://github.com/apache/airflow/pull/16554)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r664485095



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -861,6 +892,30 @@ def __init__(self, do_xcom_push=False, **kwargs):
 
         assert serialized_op.do_xcom_push is False
 
+    def test_operator_custom_serialized_fields_present(self):
+        class MyOperator(BaseOperator):
+            __serialized_fields: Optional[frozenset] = None
+
+            def __init__(self, a_dictionary_value: Dict, **kwargs):
+                super().__init__(**kwargs)
+                self.a_dictionary_value = a_dictionary_value

Review comment:
       Why would you want this field in serialized json? if the reason is that it is template_filed, then this can pass right now without any changes to the actual code (only test change) as follows:
   
   ```python
       def test_operator_custom_serialized_fields_present(self):
           class MyOperator(BaseOperator):
   
               template_fields = (
                   'a_dictionary_value',
               )
   
               def __init__(self, a_dictionary_value: Dict, **kwargs):
                   super().__init__(**kwargs)
                   self.a_dictionary_value = a_dictionary_value
   
           my_simple_dict = {"a": 1337, "b": 7331}
           op = MyOperator(task_id="dummy", a_dictionary_value=my_simple_dict)
           assert op.a_dictionary_value == my_simple_dict
   
           blob = SerializedBaseOperator.serialize_operator(op)
           serialized_op = SerializedBaseOperator.deserialize_operator(blob)
   
           assert serialized_op.a_dictionary_value == my_simple_dict  # type: ignore # pylint: disable=no-member
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r658027093



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,17 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
+            value = cls._serialize(value)
             if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
-                    value = value["__var"]
-                serialized_object[key] = value
+                pass
+            elif not (isinstance(value, dict) and "__type" in value):
+                pass
+            elif key in DAG.get_serialized_fields() | BaseOperator.get_serialized_fields():

Review comment:
       Currently, we already support overriding get_serialized_fields for a BaseOperator and it should be taken for:
   
   Example:
   https://github.com/apache/airflow/blob/d181604739c048c6969d8997dbaf8b159607904b/airflow/sensors/external_task.py#L299-L304
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks edited a comment on pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks edited a comment on pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#issuecomment-880730684


   Closing this ticket as the only use-case would be to add an extra test to make it more obvious this is possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks commented on pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#issuecomment-880730684


   Closing this ticket as the only use-case would be to add an extra test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks closed pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks closed pull request #16633:
URL: https://github.com/apache/airflow/pull/16633


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r658027093



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,17 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
+            value = cls._serialize(value)
             if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
-                    value = value["__var"]
-                serialized_object[key] = value
+                pass
+            elif not (isinstance(value, dict) and "__type" in value):
+                pass
+            elif key in DAG.get_serialized_fields() | BaseOperator.get_serialized_fields():

Review comment:
       Currently, we already support overriding get_serialized_fields for a BaseOperator and it should be taken for:
   
   Example, `ExternalTaskMarker`:
   https://github.com/apache/airflow/blob/d181604739c048c6969d8997dbaf8b159607904b/airflow/sensors/external_task.py#L299-L304
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r664518731



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -861,6 +892,30 @@ def __init__(self, do_xcom_push=False, **kwargs):
 
         assert serialized_op.do_xcom_push is False
 
+    def test_operator_custom_serialized_fields_present(self):
+        class MyOperator(BaseOperator):
+            __serialized_fields: Optional[frozenset] = None
+
+            def __init__(self, a_dictionary_value: Dict, **kwargs):
+                super().__init__(**kwargs)
+                self.a_dictionary_value = a_dictionary_value

Review comment:
       This is a valid work around. 
   I didn't check that this wouldn't cause issues.
   
   Shall I modify the PR so that we incorporate these tests to make it a bit more visible that this is an option?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r664544616



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -861,6 +892,30 @@ def __init__(self, do_xcom_push=False, **kwargs):
 
         assert serialized_op.do_xcom_push is False
 
+    def test_operator_custom_serialized_fields_present(self):
+        class MyOperator(BaseOperator):
+            __serialized_fields: Optional[frozenset] = None
+
+            def __init__(self, a_dictionary_value: Dict, **kwargs):
+                super().__init__(**kwargs)
+                self.a_dictionary_value = a_dictionary_value

Review comment:
       We already cover a bit of that, by checking `BashOperator`  in `def make_simple_dag` in `tests/serialization/test_dag_serialization.py` adds a template_field (`bash_command`) and we test that it is added in serialized json.
   
   It was added in https://github.com/apache/airflow/pull/7633/files#diff-bb7c036c775e66eeef50932e062fbfc3
   
   I am curious to know if that address you use-case or if not what would be the reason to use DAG serialization in your specific case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks commented on pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#issuecomment-872068546


   Rebased PR on latest main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r658099699



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,17 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
+            value = cls._serialize(value)
             if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
-                    value = value["__var"]
-                serialized_object[key] = value
+                pass
+            elif not (isinstance(value, dict) and "__type" in value):
+                pass
+            elif key in DAG.get_serialized_fields() | BaseOperator.get_serialized_fields():

Review comment:
       Yes that is correct. It does work for the types we currently have in the code base.
   But if `recursion_depth` would be a complex type, a dictionary for example, it would fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r658099699



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,17 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
+            value = cls._serialize(value)
             if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
-                    value = value["__var"]
-                serialized_object[key] = value
+                pass
+            elif not (isinstance(value, dict) and "__type" in value):
+                pass
+            elif key in DAG.get_serialized_fields() | BaseOperator.get_serialized_fields():

Review comment:
       Yes that is correct. It does work for the types we currently have in the code base because they are all primivite values.
   But if `recursion_depth` would be a complex (non-primitive) type, a dictionary for example, it would fail.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,17 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
+            value = cls._serialize(value)
             if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
-                    value = value["__var"]
-                serialized_object[key] = value
+                pass
+            elif not (isinstance(value, dict) and "__type" in value):
+                pass
+            elif key in DAG.get_serialized_fields() | BaseOperator.get_serialized_fields():

Review comment:
       Yes that is correct. It does work for the types we currently have in the code base because they are all primitive values.
   But if `recursion_depth` would be a complex (non-primitive) type, a dictionary for example, it would fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16633: Support serialization of non-primitive values for classes extending BaseOperator and DAG

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16633:
URL: https://github.com/apache/airflow/pull/16633#discussion_r658099699



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,17 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
+            value = cls._serialize(value)
             if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
-                    value = value["__var"]
-                serialized_object[key] = value
+                pass
+            elif not (isinstance(value, dict) and "__type" in value):
+                pass
+            elif key in DAG.get_serialized_fields() | BaseOperator.get_serialized_fields():

Review comment:
       Yes that is correct. 
   But if you make that a complex type, a dictionary for example, it will fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org