You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/21 08:31:31 UTC

[GitHub] [airflow] Jorricks opened a new pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Jorricks opened a new pull request #16554:
URL: https://github.com/apache/airflow/pull/16554


   Currently custom classes implementing BaseOperators `get_serialized_fields()` can be primitive types.
   If you try to add a complex type, it will fail on the `_deserialize()` step with a KeyError.
   However, the current serialisation is amazing and supports many complex types.
   This PR makes these complex types available to extra fields used by custom classes extending upon the BaseOperator or the DAG classes.
   
   Example use-case:
   A plugin loads the DagBag and wants to access the custom operators fields.
   Currently, we would still need to fully load it (without serialisation) to access these fields.
   This PR would improve the webserver performance a lot for views that require this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656028290



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       Alright. I am amazed by the speed and code quality. Thanks the explanation :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656040921



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       The difference is that a person could extend the DAG class and implement the `get_serialized_fields()` themselves.
   ```python
   class MyDAG(DAG):
       __serialized_fields: Optional[frozenset] = None
   
       def __init__(self, dag_id: str, a_tuple_value: Tuple, **kwargs):
           super().__init__(dag_id, **kwargs)
           self.a_tuple_value = a_tuple_value
   
       @classmethod
       def get_serialized_fields(cls):
           if not cls.__serialized_fields:
               custom_fields = {"a_tuple_value"}
               cls.__serialized_fields = frozenset(super().get_serialized_fields() | custom_fields)
           return cls.__serialized_fields
   ```
   
   In this case, the `a_tuple_value` would be in `object_to_serialize.get_serialized_fields()` but would not be in `DAG.get_serialized_fields()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656005921



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       I find these conditions really hard to follow -- I'm not quite sure how to change them.
   
   I don't think we need the isinstance check -- given `object_to_serialize: Union[BaseOperator, DAG]`, so  we could call `object_to_serialize.get_serialized_fileds()` couldn't we?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656035666



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       What's the difference between `DAG.get_serialized_fields()` and `object_to_serialize.get_serialized_fields()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656040921



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       The difference is that a person could extend the DAG class and implement the `get_serialized_fields()` themselves.
   Example from tests:
   ```python
           class MyDAG(DAG):
               __serialized_fields: Optional[frozenset] = None
   
               def __init__(self, dag_id: str, a_tuple_value: Tuple, **kwargs):
                   super().__init__(dag_id, **kwargs)
                   self.a_tuple_value = a_tuple_value
   
               @classmethod
               def get_serialized_fields(cls):
                   if not cls.__serialized_fields:
                       custom_fields = {"a_tuple_value"}
                       cls.__serialized_fields = frozenset(super().get_serialized_fields() | custom_fields)
                   return cls.__serialized_fields
   ```
   
   In this case, the `a_tuple_value` would be in `object_to_serialize.get_serialized_fields()` but would not be in `DAG.get_serialized_fields()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656032477



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       Anyway, what do you think of changing the code to:
   ```python
   if key in DAG.get_serialized_fields() | BaseOperator.get_serialized_fields():
   ```
   This would still allow for complex custom values as long as they are not within either `DAG` or `BaseOperator`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks closed pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks closed pull request #16554:
URL: https://github.com/apache/airflow/pull/16554


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks edited a comment on pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks edited a comment on pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#issuecomment-867640321


   Something went wrong with the rebase.
   Sorry for everyone who received a notifications.
   Opening a new PR soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks commented on pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#issuecomment-867640321


   Something went wrong with the rebase.
   Opening a new PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656022723



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       > why we are not always surrounding complex types in a dict with __var and __type keys
   
   Simply put: to reduce the size of the output JSON -- in testing of large DAGs this made a surprising difference to the size of the output JSON, and it was for cases where we _know_ the type of a column so we don't have to decorate it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656018821



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       It's a bit of a pickle. I agree. Let me explain it with an example.
   I copied this example from the test case.
   ```python
           class MyOperator(BaseOperator):
               __serialized_fields: Optional[frozenset] = None
   
               def __init__(self, a_dictionary_value: Dict, **kwargs):
                   super().__init__(**kwargs)
                   self.a_dictionary_value = a_dictionary_value
   
               @classmethod
               def get_serialized_fields(cls):
                   if not cls.__serialized_fields:
                       custom_fields = {"a_dictionary_value"}
                       cls.__serialized_fields = frozenset(super().get_serialized_fields() | custom_fields)
                   return cls.__serialized_fields
   ```
   In the case when we serialize this operator, and the extra field is a complex type, it will fail during deserialization because of the above clause. It is then trying to find a `__var` and `__type` field that is not there anymore. 
   Now the proposal here, is that anything that is out-side the standard functionality of the BaseOperator or DAG, is serialized in the full way with a `__var` and `__type` dictionary.
   
   Thereby, if we would call `object_to_serialize.get_serialized_fileds()` on this object, `a_dictionary_value` would again be passed as the same value (just a dict without `__var` and `__type`) which would give an exception once we deserialize.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Jorricks commented on a change in pull request #16554: Feature: add serialisation support for custom DAGs and BaseOperators

Posted by GitBox <gi...@apache.org>.
Jorricks commented on a change in pull request #16554:
URL: https://github.com/apache/airflow/pull/16554#discussion_r656018821



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,13 +186,18 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
-                serialized_object[key] = cls._serialize(value)
-            else:
-                value = cls._serialize(value)
-                if isinstance(value, dict) and "__type" in value:
+            value = cls._serialize(value)
+            if key not in decorated_fields and isinstance(value, dict) and "__type" in value:
+                # Extra check to make sure for custom operators or dags that the non-standard
+                # serialized_fields still support non-primitive types (e.g. dicts).
+                if isinstance(object_to_serialize, DAG) and key in DAG.get_serialized_fields():
+                    value = value["__var"]
+                elif (
+                    isinstance(object_to_serialize, BaseOperator)
+                    and key in BaseOperator.get_serialized_fields()
+                ):

Review comment:
       It's a bit of a pickle. I agree. Let me explain it with an example.
   I copied this example from the test case.
   ```python
           class MyOperator(BaseOperator):
               __serialized_fields: Optional[frozenset] = None
   
               def __init__(self, a_dictionary_value: Dict, **kwargs):
                   super().__init__(**kwargs)
                   self.a_dictionary_value = a_dictionary_value
   
               @classmethod
               def get_serialized_fields(cls):
                   if not cls.__serialized_fields:
                       custom_fields = {"a_dictionary_value"}
                       cls.__serialized_fields = frozenset(super().get_serialized_fields() | custom_fields)
                   return cls.__serialized_fields
   ```
   In the case when we serialize this operator, and the extra field (`a_dictionary_value`) is a complex type, it will fail during deserialization because it is missing the `__var` and `__type` keys that are not there anymore.
   Now the proposal here, is that anything that is out-side the standard functionality of the BaseOperator or DAG, is serialized in the full way with a `__var` and `__type` dictionary.
   
   Thereby, if we would call `object_to_serialize.get_serialized_fileds()` on this object, `a_dictionary_value` would be seen as a 'standard' value (just a dict without `__var` and `__type`) which would give an exception once we deserialize.
   
   Getting this to work was quite a hassle as there is so much custom logic. It makes me wonder why this clause was added and why we are not always surrounding complex types in a dict with `__var` and `__type` keys. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org