You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/09 10:42:24 UTC

[GitHub] [airflow] ashb commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

ashb commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802516280



##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.

Review comment:
       ```suggestion
       XCom backend.
       
       :meta private:
   ```
   
   This doesn't need to go in to the rendered docs.

##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}

Review comment:
       I think we should do this in resolve_xcom_backend, rather than on every call to this.

##########
File path: airflow/utils/parameters.py
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from collections import namedtuple
+
+InspectSignatureResult = namedtuple('InspectSignatureResult', ['bound_arguments', 'has_kwargs'])
+
+
+def inspect_function_arguments(function) -> InspectSignatureResult:
+    """
+    Returns the list of variables names of a function and if it
+    accepts keyword arguments.
+
+    :param function: The function to inspect
+    :rtype: InspectSignatureResult
+    """
+    parameters = inspect.signature(function).parameters
+    bound_arguments = [
+        name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
+    ]
+    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
+    return InspectSignatureResult(list(bound_arguments), has_kwargs)

Review comment:
       This is only used in one place -- do we really need a new module and a named tuple, rather than just inlining it? 

##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}
+    if set(kwargs) == {'value'}:
+        warnings.warn(
+            f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and"
+            f"must be updated to accept all params in `BaseXCom.set` except `session`.",
+            DeprecationWarning,
+            stacklevel=2,

Review comment:
       ```suggestion
   ```
   
   No wrapper is involved here, so 2 would place it at the caller's caller -- i.e. too high up

##########
File path: airflow/www/views.py
##########
@@ -3380,12 +3381,26 @@ def action_muldelete(self, items):
     def pre_add(self, item):
         """Pre add hook."""
         item.execution_date = timezone.make_aware(item.execution_date)
-        item.value = XCom.serialize_value(item.value)
+        item.value = serialize_value_shim(
+            value=item.value,
+            key=item.key,
+            task_id=item.task_id,
+            dag_id=item.dag_id,
+            execution_date=item.execution_date,
+            run_id=item.run_id,
+        )
 
     def pre_update(self, item):
         """Pre update hook."""
         item.execution_date = timezone.make_aware(item.execution_date)
-        item.value = XCom.serialize_value(item.value)
+        item.value = serialize_value_shim(

Review comment:
       I'm not a fan of this from an API -- we should instead _shim_/wrap the old backends in resolve backend, something like
   
   ```python
       serialize_value = clazz.serialized_value
       if old_serialize(serialized_value):
           def shim(value: **kwargs):
               return serialize_value(value)
           clazz.serialize_value = shim
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org