You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "uranusjr (via GitHub)" <gi...@apache.org> on 2023/02/23 11:51:24 UTC

[GitHub] [airflow] uranusjr opened a new pull request, #29721: Make serde lazy-load dependent modules

uranusjr opened a new pull request, #29721:
URL: https://github.com/apache/airflow/pull/29721

   Currently serde loads all possible classes on initialization, including any possible modules. This includes numpy and kubernetes, which are notoriously slow, even if they are never used.
   
   Instead of relying on actually loading modules, this new mechanism only initializes a collection of strings at startup, and only loads relevant serialization modules when needed. This should greatly improve startup time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr closed pull request #29721: Make serde lazy-load dependent modules

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr closed pull request #29721: Make serde lazy-load dependent modules
URL: https://github.com/apache/airflow/pull/29721


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on a diff in pull request #29721: Make serde lazy-load dependent modules

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on code in PR #29721:
URL: https://github.com/apache/airflow/pull/29721#discussion_r1133906070


##########
airflow/serialization/serde.py:
##########
@@ -270,38 +273,94 @@ def _stringify(classname: str, version: int, value: T | None) -> str:
     return s
 
 
-def _register():
-    """Register builtin serializers and deserializers for types that don't have any themselves"""
-    _serializers.clear()
-    _deserializers.clear()
-
-    for _, name, _ in iter_namespace(airflow.serialization.serializers):
-        name = import_module(name)
-        for s in getattr(name, "serializers", list()):
-            if not isinstance(s, str):
-                s = qualname(s)
-            if s in _serializers and _serializers[s] != name:
-                raise AttributeError(f"duplicate {s} for serialization in {name} and {_serializers[s]}")
-            log.debug("registering %s for serialization")
-            _serializers[s] = name
-        for d in getattr(name, "deserializers", list()):
-            if not isinstance(d, str):
-                d = qualname(d)
-            if d in _deserializers and _deserializers[d] != name:
-                raise AttributeError(f"duplicate {d} for deserialization in {name} and {_serializers[d]}")
-            log.debug("registering %s for deserialization", d)
-            _deserializers[d] = name
-            _extra_allowed.add(d)
-
-
-def _compile_patterns():
-    patterns = conf.get("core", "allowed_deserialization_classes").split()
-
-    _patterns.clear()  # ensure to reinit
-    for p in patterns:
-        p = re.sub(r"(\w)\.", r"\1\..", p)
-        _patterns.append(re.compile(p))
-
-
-_register()
-_compile_patterns()
+_AIRFLOW_SERIALIZERS = {
+    "decimal": {"Decimal": "airflow.serialization.serializers.bignum"},
+    "datetime": {
+        "date": "airflow.serialization.serializers.datetime",
+        "datetime": "airflow.serialization.serializers.datetime",
+        "timedelta": "airflow.serialization.serializers.datetime",
+    },
+    "kubernetes": {
+        "client.models.v1_resource_requirements.V1ResourceRequirements": (
+            "airflow.serialization.serializers.kubernetes"
+        ),
+        "client.models.v1_pod.V1Pod": "airflow.serialization.serializers.kubernetes",
+    },
+    "numpy": {
+        "bool_": "airflow.serialization.serializers.numpy",
+        "complex64": "airflow.serialization.serializers.numpy",
+        "complex128": "airflow.serialization.serializers.numpy",
+        "float16": "airflow.serialization.serializers.numpy",
+        "float64": "airflow.serialization.serializers.numpy",
+        "intc": "airflow.serialization.serializers.numpy",
+        "intp": "airflow.serialization.serializers.numpy",
+        "int8": "airflow.serialization.serializers.numpy",
+        "int16": "airflow.serialization.serializers.numpy",
+        "int32": "airflow.serialization.serializers.numpy",
+        "int64": "airflow.serialization.serializers.numpy",
+        "uint8": "airflow.serialization.serializers.numpy",
+        "uint16": "airflow.serialization.serializers.numpy",
+        "uint32": "airflow.serialization.serializers.numpy",
+        "uint64": "airflow.serialization.serializers.numpy",
+    },
+    "pendulum": {
+        "datetime.DateTime": "airflow.serialization.serializers.datetime",
+        "tz.timezone.FixedTimezone": "airflow.serialization.serializers.timezone",
+        "tz.timezone.Timezone": "airflow.serialization.serializers.timezone",
+    },
+}
+
+_AIRFLOW_DESERIALIZERS = {

Review Comment:
   It isn't there as it is only used within DAG serialization which I havent implemented yet. But please see below, I think this 'sync' issue should be prevented by implementing the change differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on pull request #29721: Make serde lazy-load dependent modules

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on PR #29721:
URL: https://github.com/apache/airflow/pull/29721#issuecomment-1467607698

   Well, I think the fact that you have encountered the issue yourself proves otherwise? 
   
   The issue is that currently you can now drop a new (de)serializer in the namespace (as an administrator). The serializers would then register themselves.With your change you cannot do that anymore and you made it static and thus require core changes when adding a (de)serializer. The plugin system was intentional and I do not think it makes sense to remove it.
   
   If we are so afraid of hitting a performance bottleneck due to oversight during review we could `timeit` during CI/CD with a flamegraph to make it fancy or do AST parsing instead of module loading.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29721: Make serde lazy-load dependent modules

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29721:
URL: https://github.com/apache/airflow/pull/29721#discussion_r1116057525


##########
airflow/serialization/serde.py:
##########
@@ -270,38 +273,94 @@ def _stringify(classname: str, version: int, value: T | None) -> str:
     return s
 
 
-def _register():
-    """Register builtin serializers and deserializers for types that don't have any themselves"""
-    _serializers.clear()
-    _deserializers.clear()
-
-    for _, name, _ in iter_namespace(airflow.serialization.serializers):
-        name = import_module(name)
-        for s in getattr(name, "serializers", list()):
-            if not isinstance(s, str):
-                s = qualname(s)
-            if s in _serializers and _serializers[s] != name:
-                raise AttributeError(f"duplicate {s} for serialization in {name} and {_serializers[s]}")
-            log.debug("registering %s for serialization")
-            _serializers[s] = name
-        for d in getattr(name, "deserializers", list()):
-            if not isinstance(d, str):
-                d = qualname(d)
-            if d in _deserializers and _deserializers[d] != name:
-                raise AttributeError(f"duplicate {d} for deserialization in {name} and {_serializers[d]}")
-            log.debug("registering %s for deserialization", d)
-            _deserializers[d] = name
-            _extra_allowed.add(d)
-
-
-def _compile_patterns():
-    patterns = conf.get("core", "allowed_deserialization_classes").split()
-
-    _patterns.clear()  # ensure to reinit
-    for p in patterns:
-        p = re.sub(r"(\w)\.", r"\1\..", p)
-        _patterns.append(re.compile(p))
-
-
-_register()
-_compile_patterns()
+_AIRFLOW_SERIALIZERS = {
+    "decimal": {"Decimal": "airflow.serialization.serializers.bignum"},
+    "datetime": {
+        "date": "airflow.serialization.serializers.datetime",
+        "datetime": "airflow.serialization.serializers.datetime",
+        "timedelta": "airflow.serialization.serializers.datetime",
+    },
+    "kubernetes": {
+        "client.models.v1_resource_requirements.V1ResourceRequirements": (
+            "airflow.serialization.serializers.kubernetes"
+        ),
+        "client.models.v1_pod.V1Pod": "airflow.serialization.serializers.kubernetes",
+    },
+    "numpy": {
+        "bool_": "airflow.serialization.serializers.numpy",
+        "complex64": "airflow.serialization.serializers.numpy",
+        "complex128": "airflow.serialization.serializers.numpy",
+        "float16": "airflow.serialization.serializers.numpy",
+        "float64": "airflow.serialization.serializers.numpy",
+        "intc": "airflow.serialization.serializers.numpy",
+        "intp": "airflow.serialization.serializers.numpy",
+        "int8": "airflow.serialization.serializers.numpy",
+        "int16": "airflow.serialization.serializers.numpy",
+        "int32": "airflow.serialization.serializers.numpy",
+        "int64": "airflow.serialization.serializers.numpy",
+        "uint8": "airflow.serialization.serializers.numpy",
+        "uint16": "airflow.serialization.serializers.numpy",
+        "uint32": "airflow.serialization.serializers.numpy",
+        "uint64": "airflow.serialization.serializers.numpy",
+    },
+    "pendulum": {
+        "datetime.DateTime": "airflow.serialization.serializers.datetime",
+        "tz.timezone.FixedTimezone": "airflow.serialization.serializers.timezone",
+        "tz.timezone.Timezone": "airflow.serialization.serializers.timezone",
+    },
+}
+
+_AIRFLOW_DESERIALIZERS = {

Review Comment:
   Yes apparently, this is the case before the patch and I’m not sure why.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bolkedebruin commented on pull request #29721: Make serde lazy-load dependent modules

Posted by "bolkedebruin (via GitHub)" <gi...@apache.org>.
bolkedebruin commented on PR #29721:
URL: https://github.com/apache/airflow/pull/29721#issuecomment-1465915157

   Okay, I have given this some thought. The challenge is that loading certain modules is expensive. Now your current approach is to hardcode any so called `AIRFLOW_SERIALIZERS` within the `serde` module. This creates a maintenance burden as the implementation of (de)serialization happens within the implementation module - which could lead to sync issues: this already happened when you encountered that I didn't implement deserialization for a kubernetes pod yet.
   
   The current implementation of the `serde` module already allows for string lookups. So I think it is better to do the lazy loading inside the place where the serialization happens. For example the `kubernetes` serialization could just register a string of `kubernetes.1.pod` (not the actual name) and load the module when actual (de)serialization happens. There error when trying to deserialize when the underlying module (e.g. kubernetes) isn't there is slightly different but I do not think that is a problem.
   
   In other words I think we can decentralize the lazy loading mechanism and no change is required to the serde module. This had the benefits of mixing eager loading with lazy loading, improved maintenance and better extensibility and less code changes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #29721: Make serde lazy-load dependent modules

Posted by "ashb (via GitHub)" <gi...@apache.org>.
ashb commented on code in PR #29721:
URL: https://github.com/apache/airflow/pull/29721#discussion_r1115651322


##########
airflow/serialization/serde.py:
##########
@@ -270,38 +273,94 @@ def _stringify(classname: str, version: int, value: T | None) -> str:
     return s
 
 
-def _register():
-    """Register builtin serializers and deserializers for types that don't have any themselves"""
-    _serializers.clear()
-    _deserializers.clear()
-
-    for _, name, _ in iter_namespace(airflow.serialization.serializers):
-        name = import_module(name)
-        for s in getattr(name, "serializers", list()):
-            if not isinstance(s, str):
-                s = qualname(s)
-            if s in _serializers and _serializers[s] != name:
-                raise AttributeError(f"duplicate {s} for serialization in {name} and {_serializers[s]}")
-            log.debug("registering %s for serialization")
-            _serializers[s] = name
-        for d in getattr(name, "deserializers", list()):
-            if not isinstance(d, str):
-                d = qualname(d)
-            if d in _deserializers and _deserializers[d] != name:
-                raise AttributeError(f"duplicate {d} for deserialization in {name} and {_serializers[d]}")
-            log.debug("registering %s for deserialization", d)
-            _deserializers[d] = name
-            _extra_allowed.add(d)
-
-
-def _compile_patterns():
-    patterns = conf.get("core", "allowed_deserialization_classes").split()
-
-    _patterns.clear()  # ensure to reinit
-    for p in patterns:
-        p = re.sub(r"(\w)\.", r"\1\..", p)
-        _patterns.append(re.compile(p))
-
-
-_register()
-_compile_patterns()
+_AIRFLOW_SERIALIZERS = {
+    "decimal": {"Decimal": "airflow.serialization.serializers.bignum"},
+    "datetime": {
+        "date": "airflow.serialization.serializers.datetime",
+        "datetime": "airflow.serialization.serializers.datetime",
+        "timedelta": "airflow.serialization.serializers.datetime",
+    },
+    "kubernetes": {
+        "client.models.v1_resource_requirements.V1ResourceRequirements": (
+            "airflow.serialization.serializers.kubernetes"
+        ),
+        "client.models.v1_pod.V1Pod": "airflow.serialization.serializers.kubernetes",
+    },
+    "numpy": {
+        "bool_": "airflow.serialization.serializers.numpy",
+        "complex64": "airflow.serialization.serializers.numpy",
+        "complex128": "airflow.serialization.serializers.numpy",
+        "float16": "airflow.serialization.serializers.numpy",
+        "float64": "airflow.serialization.serializers.numpy",
+        "intc": "airflow.serialization.serializers.numpy",
+        "intp": "airflow.serialization.serializers.numpy",
+        "int8": "airflow.serialization.serializers.numpy",
+        "int16": "airflow.serialization.serializers.numpy",
+        "int32": "airflow.serialization.serializers.numpy",
+        "int64": "airflow.serialization.serializers.numpy",
+        "uint8": "airflow.serialization.serializers.numpy",
+        "uint16": "airflow.serialization.serializers.numpy",
+        "uint32": "airflow.serialization.serializers.numpy",
+        "uint64": "airflow.serialization.serializers.numpy",
+    },
+    "pendulum": {
+        "datetime.DateTime": "airflow.serialization.serializers.datetime",
+        "tz.timezone.FixedTimezone": "airflow.serialization.serializers.timezone",
+        "tz.timezone.Timezone": "airflow.serialization.serializers.timezone",
+    },
+}
+
+_AIRFLOW_DESERIALIZERS = {

Review Comment:
   We don't have a deserializer for Kube?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29721: Make serde lazy-load dependent modules

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29721:
URL: https://github.com/apache/airflow/pull/29721#discussion_r1116203925


##########
airflow/serialization/serde.py:
##########
@@ -270,38 +273,94 @@ def _stringify(classname: str, version: int, value: T | None) -> str:
     return s
 
 
-def _register():
-    """Register builtin serializers and deserializers for types that don't have any themselves"""
-    _serializers.clear()
-    _deserializers.clear()
-
-    for _, name, _ in iter_namespace(airflow.serialization.serializers):
-        name = import_module(name)
-        for s in getattr(name, "serializers", list()):
-            if not isinstance(s, str):
-                s = qualname(s)
-            if s in _serializers and _serializers[s] != name:
-                raise AttributeError(f"duplicate {s} for serialization in {name} and {_serializers[s]}")
-            log.debug("registering %s for serialization")
-            _serializers[s] = name
-        for d in getattr(name, "deserializers", list()):
-            if not isinstance(d, str):
-                d = qualname(d)
-            if d in _deserializers and _deserializers[d] != name:
-                raise AttributeError(f"duplicate {d} for deserialization in {name} and {_serializers[d]}")
-            log.debug("registering %s for deserialization", d)
-            _deserializers[d] = name
-            _extra_allowed.add(d)
-
-
-def _compile_patterns():
-    patterns = conf.get("core", "allowed_deserialization_classes").split()
-
-    _patterns.clear()  # ensure to reinit
-    for p in patterns:
-        p = re.sub(r"(\w)\.", r"\1\..", p)
-        _patterns.append(re.compile(p))
-
-
-_register()
-_compile_patterns()
+_AIRFLOW_SERIALIZERS = {
+    "decimal": {"Decimal": "airflow.serialization.serializers.bignum"},
+    "datetime": {
+        "date": "airflow.serialization.serializers.datetime",
+        "datetime": "airflow.serialization.serializers.datetime",
+        "timedelta": "airflow.serialization.serializers.datetime",
+    },
+    "kubernetes": {
+        "client.models.v1_resource_requirements.V1ResourceRequirements": (
+            "airflow.serialization.serializers.kubernetes"
+        ),
+        "client.models.v1_pod.V1Pod": "airflow.serialization.serializers.kubernetes",
+    },
+    "numpy": {
+        "bool_": "airflow.serialization.serializers.numpy",
+        "complex64": "airflow.serialization.serializers.numpy",
+        "complex128": "airflow.serialization.serializers.numpy",
+        "float16": "airflow.serialization.serializers.numpy",
+        "float64": "airflow.serialization.serializers.numpy",
+        "intc": "airflow.serialization.serializers.numpy",
+        "intp": "airflow.serialization.serializers.numpy",
+        "int8": "airflow.serialization.serializers.numpy",
+        "int16": "airflow.serialization.serializers.numpy",
+        "int32": "airflow.serialization.serializers.numpy",
+        "int64": "airflow.serialization.serializers.numpy",
+        "uint8": "airflow.serialization.serializers.numpy",
+        "uint16": "airflow.serialization.serializers.numpy",
+        "uint32": "airflow.serialization.serializers.numpy",
+        "uint64": "airflow.serialization.serializers.numpy",
+    },
+    "pendulum": {
+        "datetime.DateTime": "airflow.serialization.serializers.datetime",
+        "tz.timezone.FixedTimezone": "airflow.serialization.serializers.timezone",
+        "tz.timezone.Timezone": "airflow.serialization.serializers.timezone",
+    },
+}
+
+_AIRFLOW_DESERIALIZERS = {

Review Comment:
   cc @bolkedebruin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29721: Make serde lazy-load dependent modules

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29721:
URL: https://github.com/apache/airflow/pull/29721#issuecomment-1469628487

   The “missing” numpy registrations are intentional; those are aliases and can never be identified by the `isinstance` logic.
   
   ```pycon
   >>> import numpy
   >>> numpy.int_
   <class 'numpy.int64'>
   ```
   
   This is on a 64-bit machine; on 32-bit it’s aliased to `int32`. But the point is that no object would ever report itself as an instance of `numpy.int_`, only either one of the actual int classes.
   
   But IMO that’s beside the point either way. Missing entries is a problem no matter whether you register the types, you can equally miss adding a type to the per-module registry to adding one to the global registry. The only downside of putting all the registration in one module is when you miss registering _an entire submodule in `airflow.serialization.serializers`_, which IMO is very easy to catch (and can be automated with pre-commit if needed), and worthwhile considering how it simplifies implementing individual serializer modules in general (no need to consider import performance for each module since it’s handled by the global registry).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29721: Make serde lazy-load dependent modules

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29721:
URL: https://github.com/apache/airflow/pull/29721#issuecomment-1466145151

   I have to disagree on the maintenance part. While separating individual (de)serialiser declarations should reduce mental overhead, the requirement for it—keeping expensive imports local instead of module-level—creates at least equal, if not more mental overhead. As an example, we profiled the secret masking module when it’s first introduced (because the module is loaded when logging is configured and needs to be reasonably performant), but a slowdown very easily slipped through maintainer reviews when a contribution was made. Personally I feel it is much more easier to keep a central registry in sync since it would be more obvious for reviewers when new serialisers are added via a pull request than a seemingly innocent module-level import.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29721: Make serde lazy-load dependent modules

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29721:
URL: https://github.com/apache/airflow/pull/29721#issuecomment-1472798163

   Continued in #30094.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org