You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/23 19:06:50 UTC

[GitHub] [airflow] josh-fell opened a new issue #16618: Access specific values within an `XCom` value using Taskflow API

josh-fell opened a new issue #16618:
URL: https://github.com/apache/airflow/issues/16618


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the following questions.
   Don't worry if they're not all applicable; just try to include what you can :-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   **Description**
   Currently the `output` property of operators doesn't support accessing a specific value within an `XCom` but rather the _entire_ `XCom` value.  Ideally the behavior of calling the `XComArg` via the `output` property would function the same as the `task_instance.xcom_pull()` method in which a user has immediate access the `XCom` value and can directly access specific values in that `XCom`. 
   
   For example, in the [example DAG](https://github.com/apache/airflow/blob/main/airflow/providers/apache/beam/example_dags/example_beam.py) in the Apache Beam provider, the `jobId` arg in the `DataflowJobStatusSensor` task is a templated value using the `task_instance.xcom_pull()` method and is then accessing the `dataflow_job_id` key within the `XCom` value:
   ```python
   start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
           task_id="start_python_job_dataflow_runner_async",
           runner="DataflowRunner",
           py_file=GCS_PYTHON_DATAFLOW_ASYNC,
           pipeline_options={
               'tempLocation': GCS_TMP,
               'stagingLocation': GCS_STAGING,
               'output': GCS_OUTPUT,
           },
           py_options=[],
           py_requirements=['apache-beam[gcp]==2.26.0'],
           py_interpreter='python3',
           py_system_site_packages=False,
           dataflow_config=DataflowConfiguration(
               job_name='{{task.task_id}}',
               project_id=GCP_PROJECT_ID,
               location="us-central1",
               wait_until_finished=False,
           ),
       )
   
   wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
           task_id="wait-for-python-job-async-done",
           job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
           expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
           project_id=GCP_PROJECT_ID,
           location='us-central1',
       )
   ```
   There is no current, equivalent way to directly access the `dataflow_job_id` value in same manner using the `output` property.
   
   Using `start_python_job_dataflow_runner_async.output["dataflow_job_id"]` yields an equivalent `task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', key='dataflow_job_id'`.
   
   Or even `start_python_job_dataflow_runner_async.output["return_value"]["dataflow_job_id"]` yields the same result: `task_instance.xcom_pull(task_ids='start_python_job_dataflow_runner_async', key='dataflow_job_id'`.
   
   **Use case / motivation**
   It's functionally intuitive for users to have direct access to the specific values in an `XCom` related to the `XComArg` via the Taskflow API as with the classic `xcom_pull()` method.
   
   **Are you willing to submit a PR?**
   I would love to but I would certainly need some guidance on nuances here.
   
   **Related Issues**
   https://github.com/apache/airflow/issues/10285
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ricardogaspar2 edited a comment on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ricardogaspar2 edited a comment on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-892728928


   > > It would also be great if there was a way of NOT creating the dependencies between tasks automatically.
   > > For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
   > 
   > Without a dependency then you might try to get a value out of XCom that hasn't been written yet!
   
   Makes sense.  I retire my comment then.
   I think I got used to see how DAGs are now rendered with this new XComs/TaskFlow API (using `.output`) . Before Airflow wasn't creating edges between the tasks that were using the values.
   Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-879670664


   But wouldn’t `my_op.output.some_xcom_key` also be problematic if the xcom return value is a custom object? Although indeed there would have been significantly fewer edge cases.
   
   At this point, maybe the best solution would be to deprecate accessing `my_op.output` directly, and require doing somethign else to get the default return value, say `my_op.output[None]` or `my_op.return_value`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ricardogaspar2 commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ricardogaspar2 commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-892728928


   > > It would also be great if there was a way of NOT creating the dependencies between tasks automatically.
   > > For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
   > 
   > Without a dependency then you might try to get a value out of XCom that hasn't been written yet!
   
   Makes sense. I think I got used to see how DAGs are now rendered with this new XComs/TaskFlow API (using `.output`) . Before Airflow wasn't creating edges between the tasks that were using the values.
   Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-876290087


   I guess part of the confusion is some operators have an xcom return_key that is a dict, and others return separate xcom keys directly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ricardogaspar2 commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ricardogaspar2 commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-899396032


   > > Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI
   > 
   > @ricardogaspar2 Sounds useful -- do you have any screenshot you could show?
   
   Sure thing.
   The screenshot below was grabbed from this talk: https://www.youtube.com/watch?v=D_1VJapCscc&t=1055s
   
   ![Screenshot 2021-08-16 at 11 17 09](https://user-images.githubusercontent.com/3660448/129548809-94c88c08-62a0-4bb6-965a-b09641e766c1.png)
   
   There is also some info here: https://dagster.io/blog/dagster-airflow
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-899326125


   > Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI
   
   @ricardogaspar2 Sounds useful -- do you have any screenshot you could show?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-891827633


   > It would also be great if there was a way of NOT creating the dependencies between tasks automatically.
   > For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
   
   Without a dependency then you might try to get a value out of XCom that hasn't been written yet!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-876288276


   Hmmm, the "native" way I would like this to work is:
   
   ```python
   my_op.output['dataflow_job_id']
   # Ideal, would be equivalent. to my_op.xcom_pull(key='return_value')['dataflow_job_id']
   ```
   
   But sadly XComArg already has a `__getitem__` to override the key, which means it is actually 
   
   ```python
   my_op.output['dataflow_job_id']
   # Ideal, would be equivalent. to my_op.xcom_pull(key="dataflow_job_id"]
   ```
   
   If we hadn't yet released it, I would have suggested that the facility to change the key should have been done by `__getattr__`, yielding:
   
   ```python
   my_op.output.some_xcom_key # my_op.xcom_pull(key='some_xcom_key')
   my_op.output['attr'] # my_op.xcom_pull(key='return_value')['attr']
   ```
   
   But that ship is sailed, so I'm not sure how to proceed nicely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ricardogaspar2 edited a comment on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ricardogaspar2 edited a comment on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-892728928


   > > It would also be great if there was a way of NOT creating the dependencies between tasks automatically.
   > > For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
   > 
   > Without a dependency then you might try to get a value out of XCom that hasn't been written yet!
   
   Makes sense.  I retire my comment then.
   I think I got used to see how DAGs are now rendered with this new XComs/TaskFlow API (using `.output`) . Before Airflow wasn't creating edges between the tasks that were using the values.
   Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-867087959


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ricardogaspar2 commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ricardogaspar2 commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-892728928


   > > It would also be great if there was a way of NOT creating the dependencies between tasks automatically.
   > > For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
   > 
   > Without a dependency then you might try to get a value out of XCom that hasn't been written yet!
   
   Makes sense. I think I got used to see how DAGs are now rendered with this new XComs/TaskFlow API (using `.output`) . Before Airflow wasn't creating edges between the tasks that were using the values.
   Not part of this topic, but it would be cool to have a visual representation of the variable that is being passed, much like Dagster does in their UI. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-879700551


   > At this point, maybe the best solution would be to deprecate accessing `my_op.output` directly, and require doing somethign else to get the default return value, say `my_op.output[None]` or `my_op.return_value`?
   
   I'm kind of on a big kick to reduce "boilerplate" in DAGs, so while both of these would be explicit, it's longer and a bit "un-pythonic".
   
   Not sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ricardogaspar2 commented on issue #16618: Access specific values within an `XCom` value using Taskflow API

Posted by GitBox <gi...@apache.org>.
ricardogaspar2 commented on issue #16618:
URL: https://github.com/apache/airflow/issues/16618#issuecomment-889400597


   Having this feature would be great! 👏 
   Can't wait for this!
   
   It would also be great if there was a way of NOT creating the dependencies between tasks automatically. 
   For cluster or job IDs (EMR, Databricks, etc) is very common, and having these dependencies created automatically doesn't make much sense.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org