You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/28 06:02:30 UTC

[GitHub] [airflow] uranusjr opened a new pull request, #25355: Check expand_kwargs() input type before unmapping

uranusjr opened a new pull request, #25355:
URL: https://github.com/apache/airflow/pull/25355

   This was originally done when the value is pushed in upstream, but that was removed when we implemented `map()`, and now should be done when the value is pulled. The task would have failed without these explicit checks, but with very cryptic error messages.
   
   Close #25352


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25355: Check expand_kwargs() input type before unmapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25355:
URL: https://github.com/apache/airflow/pull/25355#discussion_r931814073


##########
airflow/models/expandinput.py:
##########
@@ -245,12 +245,20 @@ def get_total_map_length(self, run_id: str, *, session: Session) -> int:
             raise NotFullyPopulated({"expand_kwargs() argument"})
         return value
 
-    def resolve(self, context: Context, session: Session) -> dict[str, Any]:
+    def resolve(self, context: Context, session: Session) -> Mapping[str, Any]:
         map_index = context["ti"].map_index
         if map_index < 0:
             raise RuntimeError("can't resolve task-mapping argument without expanding")
-        # Validation should be done when the upstream returns.
-        return self.value.resolve(context, session)[map_index]
+        mappings = self.value.resolve(context, session)
+        if not isinstance(mappings, collections.abc.Sequence):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not {type(mappings).__name__}")

Review Comment:
   This one shouldn’t be possible in theory since the upstream ensures the return value is list-like, but let’s check anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25355: Check expand_kwargs() input type before unmapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25355:
URL: https://github.com/apache/airflow/pull/25355#discussion_r931813551


##########
airflow/models/expandinput.py:
##########
@@ -245,12 +245,20 @@ def get_total_map_length(self, run_id: str, *, session: Session) -> int:
             raise NotFullyPopulated({"expand_kwargs() argument"})
         return value
 
-    def resolve(self, context: Context, session: Session) -> dict[str, Any]:
+    def resolve(self, context: Context, session: Session) -> Mapping[str, Any]:
         map_index = context["ti"].map_index
         if map_index < 0:
             raise RuntimeError("can't resolve task-mapping argument without expanding")
-        # Validation should be done when the upstream returns.
-        return self.value.resolve(context, session)[map_index]
+        mappings = self.value.resolve(context, session)
+        if not isinstance(mappings, collections.abc.Sequence):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not {type(mappings).__name__}")
+        mapping = mappings[map_index]
+        if not isinstance(mapping, collections.abc.Mapping):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not list[{type(mapping).__name__}]")
+        for key in mapping:
+            if not isinstance(key, str):
+                raise ValueError(f"expand_kwargs() input dict keys must be str, not {type(key).__name__}")

Review Comment:
   This error message is not tested since it’s impossible to hit it with the default XCom backend, which uses JSON and coerces all dict keys into str. But it’s a possible code path for custom XCom storages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25355: Check expand_kwargs() input type before unmapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25355:
URL: https://github.com/apache/airflow/pull/25355#discussion_r932050721


##########
airflow/models/expandinput.py:
##########
@@ -245,12 +245,20 @@ def get_total_map_length(self, run_id: str, *, session: Session) -> int:
             raise NotFullyPopulated({"expand_kwargs() argument"})
         return value
 
-    def resolve(self, context: Context, session: Session) -> dict[str, Any]:
+    def resolve(self, context: Context, session: Session) -> Mapping[str, Any]:
         map_index = context["ti"].map_index
         if map_index < 0:
             raise RuntimeError("can't resolve task-mapping argument without expanding")
-        # Validation should be done when the upstream returns.
-        return self.value.resolve(context, session)[map_index]
+        mappings = self.value.resolve(context, session)
+        if not isinstance(mappings, collections.abc.Sequence):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not {type(mappings).__name__}")
+        mapping = mappings[map_index]
+        if not isinstance(mapping, collections.abc.Mapping):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not list[{type(mapping).__name__}]")
+        for key in mapping:
+            if not isinstance(key, str):
+                raise ValueError(f"expand_kwargs() input dict keys must be str, not {type(key).__name__}")

Review Comment:
   Ah yes the key is probably safe to show.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr merged pull request #25355: Check expand_kwargs() input type before unmapping

Posted by GitBox <gi...@apache.org>.
uranusjr merged PR #25355:
URL: https://github.com/apache/airflow/pull/25355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25355: Check expand_kwargs() input type before unmapping

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25355:
URL: https://github.com/apache/airflow/pull/25355#discussion_r932034343


##########
airflow/models/expandinput.py:
##########
@@ -245,12 +245,20 @@ def get_total_map_length(self, run_id: str, *, session: Session) -> int:
             raise NotFullyPopulated({"expand_kwargs() argument"})
         return value
 
-    def resolve(self, context: Context, session: Session) -> dict[str, Any]:
+    def resolve(self, context: Context, session: Session) -> Mapping[str, Any]:
         map_index = context["ti"].map_index
         if map_index < 0:
             raise RuntimeError("can't resolve task-mapping argument without expanding")
-        # Validation should be done when the upstream returns.
-        return self.value.resolve(context, session)[map_index]
+        mappings = self.value.resolve(context, session)
+        if not isinstance(mappings, collections.abc.Sequence):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not {type(mappings).__name__}")
+        mapping = mappings[map_index]
+        if not isinstance(mapping, collections.abc.Mapping):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not list[{type(mapping).__name__}]")
+        for key in mapping:
+            if not isinstance(key, str):
+                raise ValueError(f"expand_kwargs() input dict keys must be str, not {type(key).__name__}")

Review Comment:
   Worth including `repr(key)` in the error message too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25355: Check expand_kwargs() input type before unmapping

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25355:
URL: https://github.com/apache/airflow/pull/25355#discussion_r932025204


##########
airflow/models/expandinput.py:
##########
@@ -245,12 +245,20 @@ def get_total_map_length(self, run_id: str, *, session: Session) -> int:
             raise NotFullyPopulated({"expand_kwargs() argument"})
         return value
 
-    def resolve(self, context: Context, session: Session) -> dict[str, Any]:
+    def resolve(self, context: Context, session: Session) -> Mapping[str, Any]:
         map_index = context["ti"].map_index
         if map_index < 0:
             raise RuntimeError("can't resolve task-mapping argument without expanding")
-        # Validation should be done when the upstream returns.
-        return self.value.resolve(context, session)[map_index]
+        mappings = self.value.resolve(context, session)
+        if not isinstance(mappings, collections.abc.Sequence):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not {type(mappings).__name__}")

Review Comment:
   Yep. Does not cost us too much 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25355: Check expand_kwargs() input type before unmapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25355:
URL: https://github.com/apache/airflow/pull/25355#discussion_r932050196


##########
airflow/models/expandinput.py:
##########
@@ -245,12 +245,20 @@ def get_total_map_length(self, run_id: str, *, session: Session) -> int:
             raise NotFullyPopulated({"expand_kwargs() argument"})
         return value
 
-    def resolve(self, context: Context, session: Session) -> dict[str, Any]:
+    def resolve(self, context: Context, session: Session) -> Mapping[str, Any]:
         map_index = context["ti"].map_index
         if map_index < 0:
             raise RuntimeError("can't resolve task-mapping argument without expanding")
-        # Validation should be done when the upstream returns.
-        return self.value.resolve(context, session)[map_index]
+        mappings = self.value.resolve(context, session)
+        if not isinstance(mappings, collections.abc.Sequence):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not {type(mappings).__name__}")
+        mapping = mappings[map_index]
+        if not isinstance(mapping, collections.abc.Mapping):
+            raise ValueError(f"expand_kwargs() expects a list[dict], not list[{type(mapping).__name__}]")
+        for key in mapping:
+            if not isinstance(key, str):
+                raise ValueError(f"expand_kwargs() input dict keys must be str, not {type(key).__name__}")

Review Comment:
   No because that can potentially be a huge list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org