You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/26 15:09:21 UTC

[airflow] 12/13: Fix xcom arg.py .zip bug (#26636)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 237f2240b0b011c9d97858ba8f5c63795f692cdf
Author: Robert J. McGinness <co...@gmail.com>
AuthorDate: Mon Sep 26 05:02:55 2022 -0400

    Fix xcom arg.py .zip bug (#26636)
    
    (cherry picked from commit f219bfbe22e662a8747af19d688bbe843e1a953d)
---
 airflow/models/xcom_arg.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 9be82976ae..b70f26cd4f 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -31,7 +31,7 @@ from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.utils.context import Context
 from airflow.utils.edgemodifier import EdgeModifier
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import NOTSET
+from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
     from airflow.models.dag import DAG
@@ -322,7 +322,7 @@ class PlainXComArg(XComArg):
     def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
         task_id = self.operator.task_id
         result = context["ti"].xcom_pull(task_ids=task_id, key=str(self.key), default=NOTSET, session=session)
-        if result is not NOTSET:
+        if not isinstance(result, ArgNotSet):
             return result
         if self.key == XCOM_RETURN_KEY:
             return None
@@ -437,7 +437,7 @@ class _ZipResult(Sequence):
 
     def __len__(self) -> int:
         lengths = (len(v) for v in self.values)
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return min(lengths)
         return max(lengths)
 
@@ -460,13 +460,13 @@ class ZipXComArg(XComArg):
         args_iter = iter(self.args)
         first = repr(next(args_iter))
         rest = ", ".join(repr(arg) for arg in args_iter)
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return f"{first}.zip({rest})"
         return f"{first}.zip({rest}, fillvalue={self.fillvalue!r})"
 
     def _serialize(self) -> dict[str, Any]:
         args = [serialize_xcom_arg(arg) for arg in self.args]
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return {"args": args}
         return {"args": args, "fillvalue": self.fillvalue}
 
@@ -486,7 +486,7 @@ class ZipXComArg(XComArg):
         ready_lengths = [length for length in all_lengths if length is not None]
         if len(ready_lengths) != len(self.args):
             return None  # If any of the referenced XComs is not ready, we are not ready either.
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return min(ready_lengths)
         return max(ready_lengths)