You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/26 15:09:21 UTC
[airflow] 12/13: Fix xcom arg.py .zip bug (#26636)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 237f2240b0b011c9d97858ba8f5c63795f692cdf
Author: Robert J. McGinness <co...@gmail.com>
AuthorDate: Mon Sep 26 05:02:55 2022 -0400
Fix xcom arg.py .zip bug (#26636)
(cherry picked from commit f219bfbe22e662a8747af19d688bbe843e1a953d)
---
airflow/models/xcom_arg.py | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 9be82976ae..b70f26cd4f 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -31,7 +31,7 @@ from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.utils.context import Context
from airflow.utils.edgemodifier import EdgeModifier
from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import NOTSET
+from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
from airflow.models.dag import DAG
@@ -322,7 +322,7 @@ class PlainXComArg(XComArg):
def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
task_id = self.operator.task_id
result = context["ti"].xcom_pull(task_ids=task_id, key=str(self.key), default=NOTSET, session=session)
- if result is not NOTSET:
+ if not isinstance(result, ArgNotSet):
return result
if self.key == XCOM_RETURN_KEY:
return None
@@ -437,7 +437,7 @@ class _ZipResult(Sequence):
def __len__(self) -> int:
lengths = (len(v) for v in self.values)
- if self.fillvalue is NOTSET:
+ if isinstance(self.fillvalue, ArgNotSet):
return min(lengths)
return max(lengths)
@@ -460,13 +460,13 @@ class ZipXComArg(XComArg):
args_iter = iter(self.args)
first = repr(next(args_iter))
rest = ", ".join(repr(arg) for arg in args_iter)
- if self.fillvalue is NOTSET:
+ if isinstance(self.fillvalue, ArgNotSet):
return f"{first}.zip({rest})"
return f"{first}.zip({rest}, fillvalue={self.fillvalue!r})"
def _serialize(self) -> dict[str, Any]:
args = [serialize_xcom_arg(arg) for arg in self.args]
- if self.fillvalue is NOTSET:
+ if isinstance(self.fillvalue, ArgNotSet):
return {"args": args}
return {"args": args, "fillvalue": self.fillvalue}
@@ -486,7 +486,7 @@ class ZipXComArg(XComArg):
ready_lengths = [length for length in all_lengths if length is not None]
if len(ready_lengths) != len(self.args):
return None # If any of the referenced XComs is not ready, we are not ready either.
- if self.fillvalue is NOTSET:
+ if isinstance(self.fillvalue, ArgNotSet):
return min(ready_lengths)
return max(ready_lengths)