You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/10 07:05:42 UTC

[GitHub] [airflow] dstandish opened a new pull request #19505: XCom.serialize_value should have all params set does

dstandish opened a new pull request #19505:
URL: https://github.com/apache/airflow/pull/19505


   When implementing a custom XCom backend, in order to store XCom objects organized by dag_id, run_id etc, we need to pass those params to `serialize_value`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-1033156101


   @potiuk  I think adding context is a perfectly reasonable idea, but i don't see any reason it needs to be lumped together into this PR.  probably better handled separately IMO.  this one is just about making `serialize` consistent with `set`.  later we can consider extending what is passed to `set` (and update `serialize` in the same way)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r803520412



##########
File path: airflow/models/xcom.py
##########
@@ -458,8 +469,8 @@ def clear(
         return query.delete()
 
     @staticmethod
-    def serialize_value(value: Any):
-        """Serialize Xcom value to str or pickled object"""
+    def serialize_value(value: Any, key=None, task_id=None, dag_id=None, execution_date=None, run_id=None):

Review comment:
       We shouldn't have execution_date on main anymore, and we should add map_index now.
   ```suggestion
       def serialize_value(*, value: Any, key: str, task_id: str, dag_id: str, run_id: str, map_index: int = -1):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r757135654



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       yeah totally, and truthfully i intended to add a docstring there but was just taking a couple minutes to refactor last night... probably should have been marked as a draft...  i think flake8 fails when you omit docstring too but i `SKIP` locally sometimes when i don't want docker running...  i just meant to indicate "yup i have some explanation ready to go"
   
   anyway docstring is there now.  i tweaked the approach slightly to iterate over `bound_arguments` instead of keys ... seems a little cleaner this way.  
   
   one potential optimization is we could sort of "cache" the signature inspection so we don't have to do it every call.  or even cache the shim function so it doesn't have to do any logic... but both operations are cheap and probably it's not worth the clutter.
   
   lmkt
   
   thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r756591891



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       Worth a docstring explaining why this is needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-1033169677


   > @potiuk I think adding context is a perfectly reasonable idea, but i don't see any reason it needs to be lumped together into this PR. probably better handled separately IMO. this one is just about making `serialize` consistent with `set`. later we can consider extending what is passed to `set` (and update `serialize` in the same way)
   
   Fine for me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802975969



##########
File path: airflow/utils/parameters.py
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from collections import namedtuple
+
+InspectSignatureResult = namedtuple('InspectSignatureResult', ['bound_arguments', 'has_kwargs'])
+
+
+def inspect_function_arguments(function) -> InspectSignatureResult:
+    """
+    Returns the list of variables names of a function and if it
+    accepts keyword arguments.
+
+    :param function: The function to inspect
+    :rtype: InspectSignatureResult
+    """
+    parameters = inspect.signature(function).parameters
+    bound_arguments = [
+        name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
+    ]
+    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
+    return InspectSignatureResult(list(bound_arguments), has_kwargs)

Review comment:
       short answer no.
   
   but there are around 10 usages of inspect.signature in airflow repo.  and this helper function i think is convenient and those other usages would probably benefit from it.
   
   BUT your  point is taken and we could inline it here and then (possibly) make it a util and refactor usages in a later PR (where the justification for such a util would be more readily apparent).   shall i inline for now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r746729667



##########
File path: airflow/models/xcom.py
##########
@@ -97,7 +97,14 @@ def set(cls, key, value, task_id, dag_id, execution_date=None, run_id=None, sess
 
             execution_date = dag_run.execution_date
 
-        value = XCom.serialize_value(value)
+        value = XCom.serialize_value(
+            value=value,
+            key=key,
+            task_id=task_id,
+            dag_id=dag_id,
+            execution_date=execution_date,
+            run_id=run_id,
+        )

Review comment:
       Not sure -- haven't thought about it in detail.
   
   Probably should be forced to use named args, (but not just a `**kwargs` catch all).
   
   We should bear AIP-42 in mind here too, which might add some form of `mapping_index` argument.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802975969



##########
File path: airflow/utils/parameters.py
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from collections import namedtuple
+
+InspectSignatureResult = namedtuple('InspectSignatureResult', ['bound_arguments', 'has_kwargs'])
+
+
+def inspect_function_arguments(function) -> InspectSignatureResult:
+    """
+    Returns the list of variables names of a function and if it
+    accepts keyword arguments.
+
+    :param function: The function to inspect
+    :rtype: InspectSignatureResult
+    """
+    parameters = inspect.signature(function).parameters
+    bound_arguments = [
+        name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
+    ]
+    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
+    return InspectSignatureResult(list(bound_arguments), has_kwargs)

Review comment:
       short answer no.
   
   but there are a lot of usages of inspect.signature in airflow repo.  and this helper function i think is convenient and those other usages would probably benefit from it. sometimes the usages just look at `parameters` but don't filter with regard to `kind` which in more unusual cases could be undesirable.
   
   BUT your  point is taken and we could inline it here and then in a later PR (possibly) make it a util and refactor usages (where the justification for such a util would be more readily apparent).
   
   and yeah we could stick it in some other module to e.g. helpers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r803357741



##########
File path: airflow/utils/parameters.py
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from collections import namedtuple
+
+InspectSignatureResult = namedtuple('InspectSignatureResult', ['bound_arguments', 'has_kwargs'])
+
+
+def inspect_function_arguments(function) -> InspectSignatureResult:
+    """
+    Returns the list of variables names of a function and if it
+    accepts keyword arguments.
+
+    :param function: The function to inspect
+    :rtype: InspectSignatureResult
+    """
+    parameters = inspect.signature(function).parameters
+    bound_arguments = [
+        name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
+    ]
+    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
+    return InspectSignatureResult(list(bound_arguments), has_kwargs)

Review comment:
       inlined it!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-1012003261


   The approach looks good to me if you really don’t want to go with the version number approach. It’s unfortunate we need to do this, but there are not too many choices.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802977082



##########
File path: airflow/www/views.py
##########
@@ -3380,12 +3381,26 @@ def action_muldelete(self, items):
     def pre_add(self, item):
         """Pre add hook."""
         item.execution_date = timezone.make_aware(item.execution_date)
-        item.value = XCom.serialize_value(item.value)
+        item.value = serialize_value_shim(
+            value=item.value,
+            key=item.key,
+            task_id=item.task_id,
+            dag_id=item.dag_id,
+            execution_date=item.execution_date,
+            run_id=item.run_id,
+        )
 
     def pre_update(self, item):
         """Pre update hook."""
         item.execution_date = timezone.make_aware(item.execution_date)
-        item.value = XCom.serialize_value(item.value)
+        item.value = serialize_value_shim(

Review comment:
       sure lemme try that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r757127766



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       Very much agree with that - additionally, it's good to add more explanation in the commit message - so that it could be find in the history as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r746430191



##########
File path: airflow/models/xcom.py
##########
@@ -97,7 +97,14 @@ def set(cls, key, value, task_id, dag_id, execution_date=None, run_id=None, sess
 
             execution_date = dag_run.execution_date
 
-        value = XCom.serialize_value(value)
+        value = XCom.serialize_value(
+            value=value,
+            key=key,
+            task_id=task_id,
+            dag_id=dag_id,
+            execution_date=execution_date,
+            run_id=run_id,
+        )

Review comment:
       This counts as a breaking change -- an existing custom XCom backend would likely break as a result, so we can't do this the "simple" way.
   
   We'd have to look at the function signature and check before calling it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985829944


   > @uranusjr needs to make a similar sort of change (where the signature for BaseOperatorLink changes) and I think he was proposing putting `version` class attribute instead of using sig.inspect.
   > 
   > I guess the main difference there is the number of args wasn't changing, but sig.inspect is relatively slow. Thoughts TP? Should we use the same `Class.version` attribute approach here?
   
   Yeah sounds reasonable.  Probably not the end of the world to have a little slowness here but it does sound like a clean and direct way to achieve the same goal.  LMK
   
   @jmaldon1 
   
   > Slightly off-topic but curious to know if anyone can help me out.
   When is the clear method called in the DAG process?
   How are we avoiding the case that we clear the data from the external database, but the tasks fails and has to rerun?
   If the task reruns, won't it need to query for the data again, but if we deleted it already, then it wont be there anymore.
   Also what is the difference between delete and clear. When is delete called?
   
   Clear will be called at the start of the run, no matter whether it's the first attempt, second attempt, a retry, a backfill -- whatever: XCom is cleared at the start of the task.
   
   Delete vs clear, delete is just lower level and will delete a collection of xcom objects.  Clear removes all xcom records for a specific task instance
   
   But now that you bring this up, yeah depending on how you implement your custom xcom backend the backing files might not be purged.  E.g. in the example backend on the astronomer blog they wouldn't be; but the database would not know they are there so in effect they are not there, apart from showing up your s3 bill.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985870054


   So if my thinking is right, this is the flow of a DAG:
   
   <pre>
   (Clear Task A XCOM from airflow DB) Task A -> (Clear Task B XCOM  from airflow DB) Task B -> (Clear Task C XCOM  from airflow DB) Task C
   ^ Task A data does not exist in S3 yet, so we can't clear it.
   </pre>
   
   Just wondering when is the time to be purging S3?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r808038341



##########
File path: airflow/models/xcom.py
##########
@@ -458,8 +469,8 @@ def clear(
         return query.delete()
 
     @staticmethod
-    def serialize_value(value: Any):
-        """Serialize Xcom value to str or pickled object"""
+    def serialize_value(value: Any, key=None, task_id=None, dag_id=None, execution_date=None, run_id=None):

Review comment:
       updated. any reason you put the `*` before value and not after?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r808087235



##########
File path: airflow/models/xcom.py
##########
@@ -458,8 +469,8 @@ def clear(
         return query.delete()
 
     @staticmethod
-    def serialize_value(value: Any):
-        """Serialize Xcom value to str or pickled object"""
+    def serialize_value(value: Any, key=None, task_id=None, dag_id=None, execution_date=None, run_id=None):

Review comment:
       To make them all keyword-only arguments




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r757082478



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       Being permissive is OK IMO, no need to complicate things too much for things that would likely not going to work in the end anyway.
   
   Putting the exaplaination in the implementation is much better than in tests since it’s easy to find the implementation when you can’t understand a test, but the other way around is much more difficult.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985323347


   @uranusjr needs to make a similar sort of change (where the signature for BaseOperatorLink changes) and I think he was proposing putting `version` class attribute instead of using sig.inspect.
   
   I guess the main difference there is the number of args wasn't changing, but sig.inspect is relatively slow. Thoughts TP? Should we use the same `Class.version` attribute approach here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r783826687



##########
File path: airflow/models/xcom.py
##########
@@ -478,6 +489,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}
+    if set(kwargs) == {'value'}:
+        warnings.warn(
+            f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and"
+            f"must be updated to accept all params in `BaseXCom.set` except `session`.",

Review comment:
       ```suggestion
               f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature, "
               f"please update it to accept all params in `BaseXCom.set` except `session`.",
   ```
   
   Softer language since this is a deprecation warning, not a breakage error. I wonder if it makes sense to explicitly list out all the arguments, or even check what arguments are missing and ask for them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802971199



##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.

Review comment:
       nice, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r803153687



##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r757081246



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       > Worth a docstring explaining why this is needed.
   
   yeah true i have explanation in test i can move or copy here
   
   > Also I wonder if it’d be a good idea to emit a warning telling the XCom backend to upgrade to the new arguments.
   
   yeah if we're going deprecation route that makes sense
   
   also, i've been very permissive here allowing any combination of kwargs --- do you think i should stick with that or just allow 2 options: (1)  value only and (2) all params in `set`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985838537


   @dstandish So clear really isn't meant to be a way to clean up the data that was stored in S3 or any other external database, its moreso meant to ensure we dont reread old data?
   
   What if I wanted to ensure that my S3 is purged once the dag completes (maybe there is sensitive data that I dont want sitting in S3). Does overriding any of these functions help in that case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish merged pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish merged pull request #19505:
URL: https://github.com/apache/airflow/pull/19505


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ldacey commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
ldacey commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-1031649209


   Is there any other metadata that we can pass, potentially? I can't have a generic container with mixed data from many clients since the @task functions will save the actual data to the backend. People can only access data from certain clients (at least on Azure Blob, security is at the container/bucket level), so I am hoping for a way to provide this information somehow.
   
   Overwriting the set() function works, but in the example below I assume that the `dag_id.split("-")[0]` is the name of the container/bucket/cloud storage folder. This is not universal though, so I need to map each container name to a DAG ID. That is feasible, but it would be great if we could pass some metadata.
   
   I also need a clarification about the XCom backend. When I transform raw data, I want to keep the result permanently and reuse it for reporting, validation, etc. A common task might be unnesting JSON data into a tabular format.
   
   1) Save data to cloud storage with an Operator, pass the list of files/fragments/partitions saved to XCom to be read by downstream tasks. This is the current status, and the benefit is that I can control where the data is saved and I can reuse that data for other things.
   
   2) Return a value in a specific format (pyarrow Table in my example below) and the file will be saved automatically. This is what happens with the custom XCom backend and it reduces the amount of boilerplate and seems like a cleaner approach. However, there is less control over where this data gets saved. Am I misusing the XCom backend by wanting to keep and reuse the data similar? It seems like a waste, since the exact same data is undergoing the exact same transformation.
   
   Here is my example custom XCom backend class:
   
   ```
   class WasbXComBackend(BaseXCom):
       @classmethod
       @provide_session
       def set(
           cls, key, value, task_id, dag_id, execution_date=None, run_id=None, session=None
       ):
           """Store an XCom value"""
           from airflow.models.xcom import XCom
   
           if not (execution_date is None) ^ (run_id is None):
               raise ValueError("Exactly one of execution_date or run_id must be passed")
           if run_id:
               from airflow.models.dagrun import DagRun
   
               dag_run = (
                   session.query(DagRun).filter_by(dag_id=dag_id, run_id=run_id).one()
               )
               execution_date = dag_run.execution_date
           value = WasbXComBackend.serialize_value(
               dag_id, task_id, run_id, execution_date, value
           )
           session.query(cls).filter(
               cls.key == key,
               cls.execution_date == execution_date,
               cls.task_id == task_id,
               cls.dag_id == dag_id,
           ).delete()
           session.commit()
           session.add(
               XCom(
                   key=key,
                   value=value,
                   execution_date=execution_date,
                   task_id=task_id,
                   dag_id=dag_id,
               )
           )
           session.commit()
           session.flush()
   
       @staticmethod
       def serialize_value(
           dag_id: str, task_id: str, run_id: str, execution_date: UtcDateTime, value: Any
       ):
           if isinstance(value, pa.Table):
               fs = AdlfsHook(wasb_conn_id="azure_blob_conn_str").fs
               folder = dag_id.split("-")[0]
               if run_id is not None:
                   id = run_id.replace("-", "").replace(":", "")
               else:
                   id = execution_date.strftime("%Y%m%dT%H%M%S")
               path = f"{folder}/xcom/{task_id}/{id}.parquet"
               pq.write_table(
                   table=value,
                   where=path,
                   filesystem=fs,
                   use_dictionary=True,
                   compression="snappy",
                   flavor="spark",
               )
               value = path
           return BaseXCom.serialize_value(value)
   
       @staticmethod
       def deserialize_value(result) -> Any:
           result = BaseXCom.deserialize_value(result)
           if isinstance(result, str) and "xcom" in result:
               path = result
               fs = AdlfsHook(wasb_conn_id="azure_blob_conn_str").fs
               if path.endswith(".parquet"):
                   return pq.read_table(
                       source=path, use_pandas_metadata=True, filesystem=fs
                   )
           return result
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r746694895



##########
File path: airflow/models/xcom.py
##########
@@ -97,7 +97,14 @@ def set(cls, key, value, task_id, dag_id, execution_date=None, run_id=None, sess
 
             execution_date = dag_run.execution_date
 
-        value = XCom.serialize_value(value)
+        value = XCom.serialize_value(
+            value=value,
+            key=key,
+            task_id=task_id,
+            dag_id=dag_id,
+            execution_date=execution_date,
+            run_id=run_id,
+        )

Review comment:
       @ashb with that in mind, for the _new_ signature, do you think `serialize_value` should just take `kwargs`, or should we add all these params?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802975969



##########
File path: airflow/utils/parameters.py
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from collections import namedtuple
+
+InspectSignatureResult = namedtuple('InspectSignatureResult', ['bound_arguments', 'has_kwargs'])
+
+
+def inspect_function_arguments(function) -> InspectSignatureResult:
+    """
+    Returns the list of variables names of a function and if it
+    accepts keyword arguments.
+
+    :param function: The function to inspect
+    :rtype: InspectSignatureResult
+    """
+    parameters = inspect.signature(function).parameters
+    bound_arguments = [
+        name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
+    ]
+    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
+    return InspectSignatureResult(list(bound_arguments), has_kwargs)

Review comment:
       short answer no.
   
   but there are around 10 usages of inspect.signature in airflow repo.  and this helper function i think is convenient and those other usages would probably benefit from it.
   
   BUT your  point is taken and we could inline it here and then in a later PR (possibly) make it a util and refactor usages (where the justification for such a util would be more readily apparent).   shall i inline for now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r808115736



##########
File path: airflow/models/xcom.py
##########
@@ -458,8 +469,8 @@ def clear(
         return query.delete()
 
     @staticmethod
-    def serialize_value(value: Any):
-        """Serialize Xcom value to str or pickled object"""
+    def serialize_value(value: Any, key=None, task_id=None, dag_id=None, execution_date=None, run_id=None):

Review comment:
       ha, yes of course.  i was thinking in comparison with [here](https://github.com/apache/airflow/blob/ddb5246bd1576e2ce6abf8c80c3328d7d71a75ce/airflow/models/xcom.py#L101) but upon closer inspection that's just an overload method and not the actual signature. and i also had in mind the question of signature change -- making `value` kwarg only where previously it was not. but i guess since we're  changing signature anyway it may not matter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-1024860175


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r783826687



##########
File path: airflow/models/xcom.py
##########
@@ -478,6 +489,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}
+    if set(kwargs) == {'value'}:
+        warnings.warn(
+            f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and"
+            f"must be updated to accept all params in `BaseXCom.set` except `session`.",

Review comment:
       ```suggestion
               f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature, "
               f"please update it to accept all params in `BaseXCom.set` except `session`.",
   ```
   
   Softer language since this is a deprecation warning, not a breakage error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 edited a comment on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 edited a comment on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985752461


   Slightly off-topic but curious to know if anyone can help me out.
   
   When is the [`clear`](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/xcom/index.html#airflow.models.xcom.BaseXCom.clear) method called in the DAG process?
   How are we avoiding the case that we clear the data from the external database, but the tasks fails and has to rerun?
   If the task reruns, won't it need to query for the data again, but if we deleted it already, then it wont be there anymore.
   
   Also what is the difference between [`delete`](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/xcom/index.html#airflow.models.xcom.BaseXCom.delete) and `clear`. When is `delete` called?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r757128435



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       I learned to provide all the "why's" and context in the commit messages mostly for my future self, as I realized that I can't remember some of the why's few months after I made a change. It saved me quite a number of times.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-1030836230


   Seems that one would be indeded needed - no details but possibly there will be more comments from the users here https://apache-airflow.slack.com/archives/CCR6P6JRL/p1644129201867359?thread_ts=1643448538.080509&cid=CCR6P6JRL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r808550810



##########
File path: airflow/models/xcom.py
##########
@@ -458,8 +469,8 @@ def clear(
         return query.delete()
 
     @staticmethod
-    def serialize_value(value: Any):
-        """Serialize Xcom value to str or pickled object"""
+    def serialize_value(value: Any, key=None, task_id=None, dag_id=None, execution_date=None, run_id=None):

Review comment:
       hmm... yeah it does break some tests to make `value` keyword-only
   
   ```
   tests/models/test_xcom.py::TestXCom::test_resolve_xcom_class_fallback_to_basexcom: TypeError: serialize_value() takes 0 positional arguments but 1 was given
   tests/models/test_xcom.py::TestXCom::test_resolve_xcom_class_fallback_to_basexcom_no_config: TypeError: serialize_value() takes 0 positional arguments but 1 was given
   Error: Process completed with exit code 1.
   ```
   
   e.g. here's one test:
   ```python
       @conf_vars({("core", "xcom_backend"): "", ("core", "enable_xcom_pickling"): "False"})
       def test_resolve_xcom_class_fallback_to_basexcom(self):
           cls = resolve_xcom_backend()
           assert issubclass(cls, BaseXCom)
   
           assert cls().serialize_value([1]) == b"[1]"
   ```
   
   of course we can simply update the tests.  and it does seem unlikely that a user would be relying on this.   but it's possible.  for now i've updated it to keep `value` as positional -- but please do let me know your thoughhts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r757135654



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       yeah totally, and truthfully i intended to add a docstring there but was just taking a couple minutes to refactor last night... probably should have been marked as a draft...  i think flake8 fails when you omit docstring too but i disable locally sometimes when i don't want docker running...  i just meant to indicate "yup i have some explanation ready to go"
   
   anyway docstring is there now.  i tweaked the approach slightly to iterate over `bound_arguments` instead of keys ... seems a little cleaner this way.  
   
   one potential optimization is we could sort of "cache" the signature inspection so we don't have to do it every call.  or even cache the shim function so it doesn't have to do any logic... but both operations are cheap and probably it's not worth the clutter.
   
   lmkt
   
   thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985854128


   > @dstandish So clear really isn't meant to be a way to clean up the data that was stored in S3 or any other external database
   
   I wouldn't say quite that... if you are making a custom backend and you don't want orphaned objects, you very well might want to override clear to have it do that.  Admittedly BaseXCom might eventually want a refactor to be more friendly for this issue, and for other aspects of implementing custom xcom backends also.
   
   > its moreso meant to ensure each task starts with a clean slate and doesn't duplicate its own XComs?
   
   Airflow does have the behavior of clearing xcoms at execution start.
   
   > What if I wanted to ensure that my S3 is purged once the dag completes (maybe there is sensitive data that I dont want sitting in S3). Does overriding any of these functions help in that case?
   
   Yes of course you can do whatever you want to do in your custom xcom backend so long as it plays nicely enough with airflow 🙂
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 edited a comment on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 edited a comment on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985875424


   > 
   
   I think I need your email? I could also make a thread on the discussion tab, if youd like.
   
   Discussion: https://github.com/apache/airflow/discussions/20029


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-992657189


   @uranusjr @ashb is this the right way to approach this?  Or should we use version decorator?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-965502184


   ok @ashb i updated it to look at signature
   
   to do so, i appropriated a [nice convenience function from connexion](https://github.com/apache/airflow/blob/8505d2f0a4524313e3eff7a4f16b9a9439c7a79f/airflow/_vendor/connexion/decorators/parameter.py)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985870325


   > So if my thinking is right, this is the flow of a DAG:
   > 
   > (Clear Task A XCOM from airflow DB) Task A -> (Clear Task B XCOM  from airflow DB) Task B -> (Clear Task C XCOM  from airflow DB) Task C
   > ^ Task A data does not exist in S3 yet, so we can't clear it.
   > Just wondering when is the time to be purging S3?
   
   ping me on slack


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-984987238


   Is there any work arounds available until this gets merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 edited a comment on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 edited a comment on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985838537


   @dstandish So clear really isn't meant to be a way to clean up the data that was stored in S3 or any other external database, its moreso meant to ensure each task starts with a clean slate and doesn't duplicate its own XComs?
   
   What if I wanted to ensure that my S3 is purged once the dag completes (maybe there is sensitive data that I dont want sitting in S3). Does overriding any of these functions help in that case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r784150880



##########
File path: airflow/models/xcom.py
##########
@@ -478,6 +489,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}
+    if set(kwargs) == {'value'}:
+        warnings.warn(
+            f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and"
+            f"must be updated to accept all params in `BaseXCom.set` except `session`.",

Review comment:
       > I wonder if it makes sense to explicitly list out all the arguments, or even check what arguments are missing and ask for them?
   
   sure we can do that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802975969



##########
File path: airflow/utils/parameters.py
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from collections import namedtuple
+
+InspectSignatureResult = namedtuple('InspectSignatureResult', ['bound_arguments', 'has_kwargs'])
+
+
+def inspect_function_arguments(function) -> InspectSignatureResult:
+    """
+    Returns the list of variables names of a function and if it
+    accepts keyword arguments.
+
+    :param function: The function to inspect
+    :rtype: InspectSignatureResult
+    """
+    parameters = inspect.signature(function).parameters
+    bound_arguments = [
+        name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
+    ]
+    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
+    return InspectSignatureResult(list(bound_arguments), has_kwargs)

Review comment:
       short answer no.
   
   but there are around 10 usages of inspect.signature in airflow repo.  and this helper function i think is convenient and those other usages would probably benefit from it.
   
   BUT your  point is taken and we could inline it here and then in a later PR (possibly) make it a util and refactor usages (where the justification for such a util would be more readily apparent).   shall i inline for now?
   
   and we could stick it in some other module to e.g. helpers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r802516280



##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.

Review comment:
       ```suggestion
       XCom backend.
       
       :meta private:
   ```
   
   This doesn't need to go in to the rendered docs.

##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}

Review comment:
       I think we should do this in resolve_xcom_backend, rather than on every call to this.

##########
File path: airflow/utils/parameters.py
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import inspect
+from collections import namedtuple
+
+InspectSignatureResult = namedtuple('InspectSignatureResult', ['bound_arguments', 'has_kwargs'])
+
+
+def inspect_function_arguments(function) -> InspectSignatureResult:
+    """
+    Returns the list of variables names of a function and if it
+    accepts keyword arguments.
+
+    :param function: The function to inspect
+    :rtype: InspectSignatureResult
+    """
+    parameters = inspect.signature(function).parameters
+    bound_arguments = [
+        name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
+    ]
+    has_kwargs = any(p.kind == p.VAR_KEYWORD for p in parameters.values())
+    return InspectSignatureResult(list(bound_arguments), has_kwargs)

Review comment:
       This is only used in one place -- do we really need a new module and a named tuple, rather than just inlining it? 

##########
File path: airflow/models/xcom.py
##########
@@ -499,6 +509,27 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    """
+    Previously XCom.serialize_value only accepted one argument ``value``.  In order to give
+    custom XCom backends more flexibility with how they store values we now forward to
+    ``XCom.serialize_value`` all params passed to ``XCom.set``.  In order to maintain
+    compatibility with XCom backends written with the old signature we need to use this
+    compatibility shim, which forwards only those kwargs which are accepted by the
+    XCom backend.
+    """
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: kwargs.get(k) for k in signature.bound_arguments}
+    if set(kwargs) == {'value'}:
+        warnings.warn(
+            f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and"
+            f"must be updated to accept all params in `BaseXCom.set` except `session`.",
+            DeprecationWarning,
+            stacklevel=2,

Review comment:
       ```suggestion
   ```
   
   No wrapper is involved here, so 2 would place it at the caller's caller -- i.e. too high up

##########
File path: airflow/www/views.py
##########
@@ -3380,12 +3381,26 @@ def action_muldelete(self, items):
     def pre_add(self, item):
         """Pre add hook."""
         item.execution_date = timezone.make_aware(item.execution_date)
-        item.value = XCom.serialize_value(item.value)
+        item.value = serialize_value_shim(
+            value=item.value,
+            key=item.key,
+            task_id=item.task_id,
+            dag_id=item.dag_id,
+            execution_date=item.execution_date,
+            run_id=item.run_id,
+        )
 
     def pre_update(self, item):
         """Pre update hook."""
         item.execution_date = timezone.make_aware(item.execution_date)
-        item.value = XCom.serialize_value(item.value)
+        item.value = serialize_value_shim(

Review comment:
       I'm not a fan of this from an API -- we should instead _shim_/wrap the old backends in resolve backend, something like
   
   ```python
       serialize_value = clazz.serialized_value
       if old_serialize(serialized_value):
           def shim(value: **kwargs):
               return serialize_value(value)
           clazz.serialize_value = shim
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r808550810



##########
File path: airflow/models/xcom.py
##########
@@ -458,8 +469,8 @@ def clear(
         return query.delete()
 
     @staticmethod
-    def serialize_value(value: Any):
-        """Serialize Xcom value to str or pickled object"""
+    def serialize_value(value: Any, key=None, task_id=None, dag_id=None, execution_date=None, run_id=None):

Review comment:
       hmm... yeah it does break some tests to make `value` keyword-only
   
   ```
   tests/models/test_xcom.py::TestXCom::test_resolve_xcom_class_fallback_to_basexcom: TypeError: serialize_value() takes 0 positional arguments but 1 was given
   tests/models/test_xcom.py::TestXCom::test_resolve_xcom_class_fallback_to_basexcom_no_config: TypeError: serialize_value() takes 0 positional arguments but 1 was given
   Error: Process completed with exit code 1.
   ```
   
   e.g. here's one test:
   ```python
       @conf_vars({("core", "xcom_backend"): "", ("core", "enable_xcom_pickling"): "False"})
       def test_resolve_xcom_class_fallback_to_basexcom(self):
           cls = resolve_xcom_backend()
           assert issubclass(cls, BaseXCom)
   
           assert cls().serialize_value([1]) == b"[1]"
   ```
   
   it seems unlikely that a user would be relying on this.   but it's possible.  for now i've updated it to keep `value` as positional -- but please do let me know your thoughhts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985752461


   Slightly off-topic but curious to know if anyone can help me out.
   
   When is the `clear` method called in the DAG process?
   How are we avoiding the case that we clear the data from the external database, but the tasks fails and has to rerun?
   If the task reruns, won't it need to query for the data again, but if we deleted it already, then it wont be there anymore.
   
   Also what is the difference between `delete` and `clear`. When is `delete` called?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jmaldon1 commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
jmaldon1 commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985875424


   > 
   
   I think I need your email? I could also make a thread on the discussion tab, if youd like.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985854128


   > @dstandish So clear really isn't meant to be a way to clean up the data that was stored in S3 or any other external database
   
   I wouldn't say quite that... if you are making a custom backend and you don't want orphaned objects, you very well might want to override clear to have it do that.  Admittedly BaseXCom might eventually want a refactor to be more friendly for this issue, and for other aspects of implementing custom xcom backends also.
   
   > its moreso meant to ensure each task starts with a clean slate and doesn't duplicate its own XComs?
   
   Airflow does have the behavior of clearing xcoms at execution start.
   
   > What if I wanted to ensure that my S3 is purged once the dag completes (maybe there is sensitive data that I dont want sitting in S3). Does overriding any of these functions help in that case?
   
   Yes of course you can do whatever you want to do in your custom xcom backend so long as it plays nicely enough with airflow 🙂
   
   Maybe ping me on slack if you want to continue the conversation on this topic, which is separate from this PR.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-985072178


   > Is there any work arounds available until this gets merged?
   
   you could do it by overriding `set`, which receives all the params -- it just doesn't pass them on to `serialize_value`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19505: Add params dag_id, task_id etc to XCom.serialize_value

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-1030836483


   Possibly it would be useful to also pass context here as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#issuecomment-980218684


   Can you update the PR title as well -- feels incomplete


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19505: XCom.serialize_value should have all params set does

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19505:
URL: https://github.com/apache/airflow/pull/19505#discussion_r756592281



##########
File path: airflow/models/xcom.py
##########
@@ -364,6 +372,12 @@ def orm_deserialize_value(self) -> Any:
         return BaseXCom.deserialize_value(self)
 
 
+def serialize_value_shim(**kwargs):
+    signature = inspect_function_arguments(XCom.serialize_value)
+    kwargs = {k: v for k, v in kwargs.items() if k in signature.bound_arguments}
+    return XCom.serialize_value(**kwargs)

Review comment:
       Also I wonder if it’d be a good idea to emit a warning telling the XCom backend to upgrade to the new arguments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org