You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "vemikhaylov (via GitHub)" <gi...@apache.org> on 2023/02/18 21:37:21 UTC

[GitHub] [airflow] vemikhaylov opened a new pull request, #29608: Enable passing --xcom-args to tasks test CLI command

vemikhaylov opened a new pull request, #29608:
URL: https://github.com/apache/airflow/pull/29608

   ### Context
   
   During discussion under #28287 it was decided that it may've been useful to be able to pass XCom Args to a task, when it's being tested with the `tasks test` CLI command and notify a user if a task is dependent on some XCom Arg, but it hasn't been passed.
   
   ### Solution
   
   Introducing the `--xcom-args` argument for the `tasks test` command, which a JSON with XCom args can be passed to. The JSON has the following structure: `{"task_id": {"xcom_key": "value"}}`, which allows user to work both with the cases of the implicit XCom used by the Task Flow API (i.e., `xcom_key == "return_value"`) and of the XCom args with the custom keys (e.g., manually pushed and / or used in the templated kwargs).
   
   Hence, a full CLI call may look like: 
   
   `airflow tasks test example_passing_xcom_args_via_test_command python_echo --xcom-args {"get_python_echo_message": {"return_value": "test xcom arg"}}`
   
   or  
   
   `airflow tasks test run_bash_with_manually_pushed_command run_bash --xcom-args {"manually_push_bash_command": {"command": "echo \'test bash\'"}}`
   
   If a user misses any XCom args in their call, they'll be able to see a message like:
   
   ```
   airflow.exceptions.AirflowException: The task 'python_echo' is dependent on XCom args (task_id, key) passed from the upstream tasks: [('get_python_echo_message', 'return_value')]. Please, pass them via the --xcom-args argument (see --help for more details). The following (task_id, key) pairs are currently missed: [('get_python_echo_message', 'return_value')].
   ```
   
   closes: #28287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1141132735


##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
         map_index_start = ancestor_map_index * further_count
         return range(map_index_start, map_index_start + further_count)
 
+    def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+        """
+        To inject upstream dependencies' output instead of trying
+        to read from DB when testing a task individually.
+        """

Review Comment:
   > Write the custom data to the real XCom table, and delete them afterwards. Emit an error (and refuse to execute) if there is already existing data matching the custom input in the table. This feels reasonable since it would only be possible if you test a task in an existing DAG run but don’t want to use the actual upstream XComs, which feels very niche and not a necessary feature to me.
   
   For this one I have a concern that in a general case script may exit in a tricky way, even skipping `finally` (https://stackoverflow.com/questions/49262379/does-finally-always-execute-in-python). When we're dealing with a temporary DAG run, then that's relatively fine and is visible from the run id:
   
   https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L74-L80
   
   But for a normal DAG run it may leave the data in a strange state. Also probably if this is an ongoing DAG run, then, if I'm not mistaken, there might be race conditions like "write XCom in test command" -> "write the same XCom in a real task executed in the DAG run" -> "delete XCom in test command (clean up)".
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1117940737


##########
airflow/cli/cli_parser.py:
##########
@@ -36,13 +38,24 @@
 from airflow.exceptions import AirflowException
 from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
+from airflow.models import XCOM_RETURN_KEY

Review Comment:
   Are we fine with this dependency on the model layer here or it's better to refactor the constant out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1117962738


##########
airflow/example_dags/example_passing_xcoms_via_test_command.py:
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG demonstrating the usage of the XCom arguments in templated arguments."""
+from __future__ import annotations
+
+import pendulum
+
+from airflow.decorators import dag, task
+from airflow.models.xcom_arg import XComArg
+from airflow.operators.bash import BashOperator
+
+
+@dag(start_date=pendulum.datetime(2023, 1, 1, tz="UTC"))
+def example_passing_xcoms_via_test_command():
+    @task
+    def get_python_echo_message():
+        return "hello world"
+
+    @task
+    def python_echo(message: str, *args, **kwargs):
+        print(f"python_echo: {message}")
+
+    @task
+    def get_bash_command():
+        return "echo 'default'"
+
+    python_echo(get_python_echo_message())
+    BashOperator(task_id="run_bash", bash_command=get_bash_command())
+    bash_push = BashOperator(
+        task_id="manually_push_bash_command",
+        bash_command='{{ ti.xcom_push(key="command", value="echo \'default\'") }}',
+    )
+    BashOperator(
+        task_id="run_bash_with_manually_pushed_command", bash_command=XComArg(bash_push, key="command")
+    )
+    BashOperator(
+        task_id="run_bash_with_manually_pulled_command",
+        bash_command='{{ task_instance.xcom_pull(task_ids="manually_push_bash_command", key="command") }}',

Review Comment:
   We don't currently cover this case with the beforehand validation of the passed XComs since `XComArg.iter_xcom_references(task)` won't include it, it's a lower level. We may try to identify it real-time, when it's being queried to return a meaningful error.
   
   But generally DAG doesn't identify it as a dependency either, because it seems to be hard to do, so probably it's relatively acceptable and this use case is mostly on the user?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1437292418

   Using JSON for this feels cumbersome to me. I don’t have a much better idea though. Also we need to find a way to support custom backends that may not store values as JSON.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1132088607


##########
airflow/models/baseoperator.py:
##########
@@ -1119,6 +1121,12 @@ def set_xcomargs_dependencies(self) -> None:
                 arg = getattr(self, field)
                 XComArg.apply_upstream_relationship(self, arg)
 
+    def iter_xcom_dependencies(self) -> Iterator[tuple[Operator, str]]:
+        """Upstream dependencies that provide XComs used by this operator."""
+        from airflow.models.xcom_arg import XComArg
+
+        yield from XComArg.iter_xcom_references(self)

Review Comment:
   Why is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1437501187

   @uranusjr thank you for your feedback!
   
   > Using JSON for this feels cumbersome to me.
   
   I think it does for me as well.. At the same time, it's probably worth mentioning that `--env-vars` and `--task-params` also already use JSON for the command, so it might be not a bad idea to have `--xcom-args` consistent with them.
   
   Thinking out loud which alternative options of passing things through we may also have.
   
   **1. Simplified format in CLI**
   
   (task_id, xcom_key, value) triplets joined with a delimiter (a comma below).
   
   ```
   --xcom-args get_python_echo_message,return_value,"test xcom arg" manually_push_bash_command,command,"echo \'test bash\'"
   ```
   
   Probably all the escaping and ad-hoc handling will make things even worse.
   
   **2. Values passed through a file**
   
   ```
   --xcom-args task_args.json
   ```
   
   Sorry for ".json" - Yeah, like again, the contents of the file should have some structure. We may go with yaml :)
   
   What generally doesn't look convenient here with whatever structure of the file - requiring to create a file even for simple scenarios (if passing through the file is the only option). 
   
   --
   > Also we need to find a way to support custom backends that may not store values as JSON.
   
   Let me clarify to make sure that I understand it correctly. You mean that generally the `BaseXCom` model allow us to store any binary (`LargeBinary`) `value`'s and not all of them may be passed as JSON and through CLI itself? Probably for that the approach #2 above can be used like: 
   
   ```
   --xcom-args task_args.pickle
   ```
   
   Though from the usability perspective I wouldn't stick to this option as the only available one. What do you think?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1132142637


##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
         map_index_start = ancestor_map_index * further_count
         return range(map_index_start, map_index_start + further_count)
 
+    def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+        """
+        To inject upstream dependencies' output instead of trying
+        to read from DB when testing a task individually.
+        """

Review Comment:
   > Can the users use XCom directly without the API
   
   They can, via `XCom.get_one` or `get_many`. So I feel the only fully reliable way is to actually write the data to the database so they are available directly from the XCom interface.
   
   We already do something similar for DagRun (if you specify a DagRun that does not actually exist, we would create a temporary one in the database for the `test` run and delete it afterwards), so I can think of a couple of approaches:
   
   1. Write the custom XCom data to a separate table, and read them on `XCom`. Delete them after the test run.
   2. Write the custom data to the real XCom table, and delete them afterwards. Emit an error (and refuse to execute) if there is already existing data matching the custom input in the table. This feels reasonable since it would only be possible if you test a task _in an existing DAG run but don’t want to use the actual upstream XComs_, which feels very niche and not a necessary feature to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1141132735


##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
         map_index_start = ancestor_map_index * further_count
         return range(map_index_start, map_index_start + further_count)
 
+    def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+        """
+        To inject upstream dependencies' output instead of trying
+        to read from DB when testing a task individually.
+        """

Review Comment:
   > Write the custom data to the real XCom table, and delete them afterwards. Emit an error (and refuse to execute) if there is already existing data matching the custom input in the table. This feels reasonable since it would only be possible if you test a task in an existing DAG run but don’t want to use the actual upstream XComs, which feels very niche and not a necessary feature to me.
   
   For this one I have a concern that in general script may exit in a tricky way, even skipping `finally` (https://stackoverflow.com/questions/49262379/does-finally-always-execute-in-python). When we're dealing with a temporary DAG run, then that's relatively fine and is visible from the run id:
   
   https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L74-L80
   
   But for a normal DAG run it may leave the data in a strange state. Also probably if this is an ongoing DAG run, then, if I'm not mistaken, there might be race conditions like "write XCom in test command" -> "write the same XCom in a real task executed in the DAG run" -> "delete XCom in test command (clean up)".
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1442811804

   I think your plan makes sense. So we’ll use this PR to implement value-passing, and add path support in a later PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1441235340

   Allowing a path makes sense to me. Since a JSON string has a pretty distinctive shape (starts and ends with either brackets or braces depending on which format we decide to use), I think we can allow both with one option.
   
   While path to a pickle object also makes sense in theory, generating that pickle file would be pretty cumbersome and I’m not sure people would want to use that. I would probably leave that out for now until someone asks for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1442592589

   > `--xcoms`
   
   I love it! Though technically the "task" declaration operates with the `XComArg` layer, inside the task execution it may call `xcom_pull` and `xcom_push` directly, which is about pure `XCom`, so it may be fine and even more precise to go laconic.  
   
   > ```
   > airflow task run ... \
   >     --xcom task1 key value \
   >     --xcom task2 k1 v1 \
   >     --xcom task2 k2 v2
   > ```
   > 
   > I think this is possible by setting `nargs=3`.
   
   It looks really great! Though there are the following issues, which I see with the format:
   
   1. It's not rather extensible. Imagine we want to pass one more argument (e.g., `map_index` - I'm not sure if it's applicable, but let's imagine it's). Then it becomes `--xcom task1 key value map_index` - may be becomes less intuitive interface, where you should remember the order of the arguments.
   
   2. It's possible to introduce the default key (i.e., `return_value`) by interpreting 2 args as `--xcom task1 value`, but it makes  even harder to extend it to more args in the future if needed.
   
   3. It's not consistent with `--env-vars` and `--task-params`, which are JSON already.
   
   Please let me know if those are valid concerns. I personally prefer more explicit interfaces and sometimes lean towards overly explicit ones, so it all may be rather subjective, feel free to push back :)
   
   But so far I would probably stick to:
   
   ```
   --xcoms [{"task_id": "get_python_echo_message", "value": "test xcom arg"}, ...]
   ```
   and
   ```
   --xcoms my_xcoms.json
   ```
   
   In terms of execution, I wanted to ask if it's fine if I move iteratively here and break it down into two PRs: 1) CLI value passing; 2) File value passing? Among other things, it should facilitate the review process a bit. The first part should be self-contained as well, so probably a safe change to be committed separately?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1144316831


##########
airflow/models/baseoperator.py:
##########
@@ -1119,6 +1121,12 @@ def set_xcomargs_dependencies(self) -> None:
                 arg = getattr(self, field)
                 XComArg.apply_upstream_relationship(self, arg)
 
+    def iter_xcom_dependencies(self) -> Iterator[tuple[Operator, str]]:
+        """Upstream dependencies that provide XComs used by this operator."""
+        from airflow.models.xcom_arg import XComArg
+
+        yield from XComArg.iter_xcom_references(self)

Review Comment:
   `iter_xcom_references` is an internal interface that Airflow internals are free to use. MappedOperator has a wrapper for it because the wrapper also references a private member; this is not needed for BaseOperator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1117956211


##########
airflow/cli/commands/task_command.py:
##########
@@ -596,6 +597,10 @@ def task_test(args, dag=None):
         task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="db"
     )
 
+    xcoms = args.xcoms or []
+    _validate_injected_xcoms(xcoms, task)

Review Comment:
   If we stick to the `ti.inject_xcoms` approach, then it may be considered moving the validation logic to `ti.inject_xcoms` itself. It'll imply that if `inject_xcoms` is called, it should inject all the required XComs at once, but probably that's fine.
   
   Also the exception raised from there will lack the `"Please, pass them via the --xcoms argument (see --help for more details)."` part (in the context of the method we won't know where it's called from). It will need either to be caught and added in `task_command` or omitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1439054387

   Regarding the JSON direction potentially a better way would be to make it more explicitly structured like:
   
   ```
   {"get_python_echo_message": {"key": "return_value", "value": "test xcom arg"}}
   ```
   or just
   ```
   [{"task_id": "get_python_echo_message", "key": "return_value", "value": "test xcom arg"}, ...]
   ```
   
   `key` may be optional with `"return_value"` as the default.
   
   ```
   [{"task_id": "get_python_echo_message", "value": "test xcom arg"}, ...]
   ```
   
   It may look a bit more wordy and cumbersome, but at the same time clearer and more extensible in the future. Before adjusting the implementation, I would like to get more eyes on these to see if there are any concerns or if there are good alternatives to the JSON way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #29608: Enable passing --xcoms to tasks test CLI command
URL: https://github.com/apache/airflow/pull/29608


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1537578058

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1141083330


##########
airflow/models/baseoperator.py:
##########
@@ -1119,6 +1121,12 @@ def set_xcomargs_dependencies(self) -> None:
                 arg = getattr(self, field)
                 XComArg.apply_upstream_relationship(self, arg)
 
+    def iter_xcom_dependencies(self) -> Iterator[tuple[Operator, str]]:
+        """Upstream dependencies that provide XComs used by this operator."""
+        from airflow.models.xcom_arg import XComArg
+
+        yield from XComArg.iter_xcom_references(self)

Review Comment:
   Is it a question why we need to define a method for that instead of calling `XComArg.iter_xcom_references` directly? As I see that, when we look at a `task` and want to understand, which XComs it depends on, `XComArg.iter_xcom_references(task)` seems to me like a lower level detail of implementation. Something similar happens in the mapped operator:
   
   https://github.com/apache/airflow/blob/3f6b5574c61ef9765d077bdd08ccdaba14013e4a/airflow/models/mappedoperator.py#L638-L643



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1132087844


##########
airflow/cli/commands/task_command.py:
##########
@@ -596,6 +597,10 @@ def task_test(args, dag=None):
         task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="db"
     )
 
+    xcoms = args.xcoms or []
+    _validate_injected_xcoms(xcoms, task)

Review Comment:
   > Also the exception raised from there will lack the [context]
   
   You can raise something other than AirflowException (I don’t see why you need to raise that exception), and catch it here to provide the additional context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1479700104

   You should rebase now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1146825368


##########
airflow/models/baseoperator.py:
##########
@@ -1119,6 +1121,12 @@ def set_xcomargs_dependencies(self) -> None:
                 arg = getattr(self, field)
                 XComArg.apply_upstream_relationship(self, arg)
 
+    def iter_xcom_dependencies(self) -> Iterator[tuple[Operator, str]]:
+        """Upstream dependencies that provide XComs used by this operator."""
+        from airflow.models.xcom_arg import XComArg
+
+        yield from XComArg.iter_xcom_references(self)

Review Comment:
   Is the recommendation to call `XComArg.iter_xcom_references` directly then, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dimberman commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "dimberman (via GitHub)" <gi...@apache.org>.
dimberman commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1441061234

   Hi @vemikhaylov thank you for setting up this PR. This is a pretty important missing piece of the local dev story.
   
   It would make sense also to offer the option to pass in a json/yaml file path. For a more complex item I imagine this would make more sense then needing to manually add the json object in the CLI arguments.
   
   Another question is whether we should allow people to point at pickle files for pickle objects. I know we generally discourage people from turning on pickle, but it is a thing people do so we might want to allow those folks to locally test.
   
   cc: @uranusjr WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1441235817

   By the way, the argument name can probably be shortened to just `--xcom` or `--xcoms`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1127597041


##########
airflow/cli/cli_parser.py:
##########
@@ -36,13 +38,24 @@
 from airflow.exceptions import AirflowException
 from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
+from airflow.models import XCOM_RETURN_KEY

Review Comment:
   Better to refactor the constant out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcom-args to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1117955516


##########
airflow/models/taskinstance.py:
##########
@@ -2741,6 +2749,14 @@ def tg2(inp):
         map_index_start = ancestor_map_index * further_count
         return range(map_index_start, map_index_start + further_count)
 
+    def inject_xcoms(self, xcoms: list[dict[str, Any]]):
+        """
+        To inject upstream dependencies' output instead of trying
+        to read from DB when testing a task individually.
+        """

Review Comment:
   To be honest, I'm not quite sure about the approach, so would ask for feedback. A potential risk and questions around the current one:
   
   1. The users of the framework may start to use it as a public API, but we may want to change how it works at some point in the future. Is it a valid concern or not?
   
   2. Can the users use `XCom` directly without the API's exposed by `TaskInstance` or the models aren't available in the DAGs, so we cover the scenarios of the interactions with `XCom` on the `TaskInstance` layer? 
   
   --
   
   Alternative options, which I thought of:
   
   1. Pass `xcoms` down the call stack, like to `ti.run(injected_xcoms=xcoms)`. Extending the `ti.run` signature didn't  look quite attractive to me, though may be more clear and explicit in comparison to preliminary "injection". But injection is similar to what we do with `params`: https://github.com/apache/airflow/blob/4e3b5ae7248c2327864f64b25dc7a5bd7705430c/airflow/cli/commands/task_command.py#L590
   
   2. Have a module with a global variable, which we could initialise in `task_command` and then read from either in `TaskInstance.xcom_pull` or `BaseXCom.get_one` / `.get_many`. Global state..
   
   3. Monkeypatch `ti.xcom_pull` (dirty?)
   
   4. Implement a custom `XCom` backend to be used in `xcom.py`, which would read the injected data from memory. 
   
   5. If we are able to run everything inside a common transaction, which is rolled-back at the end of the command execution, then we can just "put" the corresponding XComs in "DB" before running the task?
   
   --
   
   Since I wasn't sure about feasibility and validity of some of the approaches above (especially 4 and 5), I implemented the most straightforward way, similar to what we already have (the `params` reference above), seeking for feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29608:
URL: https://github.com/apache/airflow/pull/29608#issuecomment-1479637874

   FYI: Fix to failing test in https://github.com/apache/airflow/pull/29608


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1141076082


##########
airflow/cli/commands/task_command.py:
##########
@@ -596,6 +597,10 @@ def task_test(args, dag=None):
         task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="db"
     )
 
+    xcoms = args.xcoms or []
+    _validate_injected_xcoms(xcoms, task)

Review Comment:
   > I don’t see why you need to raise that exception
   
   Let me clarify if you are questioning why we should raise an exception at all (like not go with returning a bool flag or something) or you're not sure if we need the `AirflowException` class here? If it's the latter, what is the intended scenario for that? I just noticed that it had been used for the args validation in the module.
   
   But generally absolutely aligned! Actually I also mentioned that it may've been intercepted and enriched, definitely not a blocker, just a consideration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vemikhaylov commented on a diff in pull request #29608: Enable passing --xcoms to tasks test CLI command

Posted by "vemikhaylov (via GitHub)" <gi...@apache.org>.
vemikhaylov commented on code in PR #29608:
URL: https://github.com/apache/airflow/pull/29608#discussion_r1141068460


##########
airflow/cli/cli_parser.py:
##########
@@ -36,13 +38,24 @@
 from airflow.exceptions import AirflowException
 from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
+from airflow.models import XCOM_RETURN_KEY

Review Comment:
   Created it as a separate PR to keep this one more focused (#30180).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org