You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/05 18:21:21 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #16823: Add ability to specify XCom key for operators and task decorator

ephraimbuddy opened a new pull request #16823:
URL: https://github.com/apache/airflow/pull/16823


   Currently, we cannot specify a key for XCom using the task decorator
   or any Operator except we explicitly use ti.xcom_push(key=key, value=value).
       
   Whenever XCom is pushed, it implicitly use the key 'return_value' if we don't
   explicitly push it and in the case of task decorator, we can't change this
   value.
       
    This change adds an operator argument called `xcom_key` which helps us to change
    the xcom_key
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#discussion_r664836811



##########
File path: tests/models/test_taskinstance.py
##########
@@ -1163,6 +1163,33 @@ def test_xcom_push_flag(self):
         ti.run()
         assert ti.xcom_pull(task_ids=task_id, key=models.XCOM_RETURN_KEY) is None
 
+    def test_xcom_push_with_a_specified_key_in_operator(self):
+        """
+        Tests the option for Operators to push XComs with a given key
+        """
+        key = 'mykey'
+        value = 'hello'
+        task_id = 'test_no_xcom_push'
+        dag = models.DAG(dag_id='test_xcom')
+
+        # XCom pushed with key=mykey
+        task = PythonOperator(
+            task_id=task_id,
+            dag=dag,
+            python_callable=lambda: value,
+            xcom_key=key,

Review comment:
       I have provided answers to why we may need it in all operators. Check https://github.com/apache/airflow/pull/16823#issuecomment-875037822




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-896697397


   @ashb, I have just discovered that XCom key can actually be changed with the TaskFlow API currently:
   
   ```python
   
   from airflow import DAG
   from airflow.utils.dates import days_ago
   
   value_1 = [1, 2, 3]
   value_2 = {'a': 'b'}
   
   dag = DAG(
       'example_xcom',
       schedule_interval="@once",
       start_date=days_ago(2),
       tags=['example'],
   )
   
   @dag.task()
   def push(**kwargs):
       """Pushes an XCom without a specific target"""
       kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
   
   
   @dag.task()
   def push_by_returning():
       """Pushes an XCom without a specific target, just by returning it"""
       return value_2
   
   
   @dag.task()
   def puller(data1, data2, **kwargs):
       """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
       ti = kwargs['ti']
       # get value_1
       pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
       if pulled_value_1 != value_1:
           raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
   
       # get value_2
       pulled_value_2 = data2
       if pulled_value_2 != value_2:
           raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
   
       # get both value_1 and value_2
       pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
       if pulled_value_1 != value_1:
           raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')
       if pulled_value_2 != value_2:
           raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
   
   
   puller(push(), push_by_returning())
   
   ```
   The decorated function accepts the `**kwargs` and we can use `ti.xcom_push` and also `ti.xcom_pull` 
   
   Closing this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-874271273


   Tested with these two dags:
   
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   from airflow.operators.python import PythonOperator
   
   dag =  DAG(
       "example_xcom",
       schedule_interval="@once",
       start_date=days_ago(2)
   ) 
   
   def push_xcom(ti):
       return [3,4,5,6]
   
   def pull_xcom(ti):
       xcom = ti.xcom_pull(task_ids='push_xcom', key='mykey')
       print("Xcom pulled: ", xcom)
   
   
   with dag:
       task1 = PythonOperator(
           task_id="push_xcom",
           python_callable=push_xcom,
           xcom_key='mykey',
       )
       task2 = PythonOperator(
           task_id="pull_xcom",
           python_callable=pull_xcom
       )
       task1 >> task2
   ```
   Another one:
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   
   dag = DAG(dag_id="test-xcom-key", schedule_interval="@once", start_date=days_ago(2))
   
   @dag.task(xcom_key="mykey")
   def push_xcom():
       return {"mydata":[3,4,5,6]}
   
   @dag.task()
   def push_another_xcom(data):
       print(data)
       return {'mydata':"mydata"}
   
   
   push_another_xcom(push_xcom())
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ldacey commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ldacey commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-981532073


   Okay, thanks. So no way to actually _return_ a custom key name (without using `ti.xcom_push` explicitly)? I tried to override the `output` function from the BaseOperator, but no luck it seems. It is fine either way, I just assumed I was missing something obvious.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875037822


   > Can you explain _why_ this is desirable behaviour?
   > 
   > Doesn't returning a dictionary (and setting multiple outputs?) already set different xcom keys, or am I making that up?
   
   It does set different xcom keys for 'each' key in the dictionary and at the end, the whole dictionary is also set with `return_value` as key. 
   
   What I want in this case, is to change this 'return_value' key that's outputted which we don't have control over.
   Take, for example, a python operator task that pushes xcom by returning a value.:
   
   ```
   def push_xcom(ti):
       return [3,4,5,6]
   
    task1 = PythonOperator(
           task_id="push_xcom",
           python_callable=push_xcom,
       )
   ```
   Here, we can't change the  key for the xcom pushed, the only way to do it is use 
   
   ```
   def push_xcom(ti):
       ti.xcom_push(key='mykey', value=[3,4,5,6])
   ```
   
   And in task decorator we can't change the xcom key used for this function :
   ```
   @task()
   def push_xcom(ti):
       return [3,4,5,6]
   ```
   If we use multiple_outputs and do something like:
   ```
   @task(multiple_outputs=True)
   def push_xcom(ti):
       return {'mykey':[3,4,5,6]}
   ```
   Then Xcoms pushed will be 
   ```
   key=mykey, value=[3,4,5,6]
   key=return_value, value={'mykey':[3,4,56]}
   ```
   With this change, if we have:
   ```
   @task(xcom_key='mykey')
   def push_xcom(ti):
       return [3,4,5,6]
   ```
   Xcom pushed will just be:
   ```
   key=mykey, value=[3,4,5,6]
   ```
   Which is mostly what anyone wants


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875407947


   >  Ok. For this case I described where I want to know in the UI if XCom was uploaded to GCS and if not see the value, by customising the orm_deserialize_value.
   
   If that is the goal, then I would suggest we add an extra column to the XCom table (json?) that is usable by xcom backends for storing "whatever they like" -- and then _that_ is what we use to decide what to show.
   
   Because the XCom key should be entirely under user/dag author control -- and while we could put restrictions on it if we wanted to, it could lead to non-obvious behaviour as I mentioned earlier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16823:
URL: https://github.com/apache/airflow/pull/16823


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875098890


   > > I want to be able to control the key used in pushing xcom when a value is returned from a task
   > 
   > Step back further 😁 Why? Please explain what you are trying to achieve, not the implementation you have already thought about.
   
   😀😀
   Ok. For this case I described where I want to know in the UI if XCom was uploaded to GCS and if not see the value, by customising the `orm_deserialize_value`.
   
   Take for instance, I'm building a custom xcom backend where I want to have XComs stored in GCS if the value is large or store in db if the value is small. And I want to know in the UI when I visit the XCom tab that this XCOM is in db or in GCS.
   
   Is there another way to accomplish this other than to prefix the keys for xcoms stored in GCS with say `df' and do something like:
   ```
   def orm_deserialize_value(self):
       if self.key.starts_with('df'):
           return 'Xcom uploaded to GCS'
       return BaseXCom.deserialize_value(self)
   ```
   
   I don't have a strong opinion on this feature though, we can close it if we don't want it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-874271273


   Tested with these two dags:
   
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   from airflow.operators.python import PythonOperator
   
   dag =  DAG(
       "example_xcom",
       schedule_interval="@once",
       start_date=days_ago(2)
   ) 
   
   def push_xcom(ti):
       # returning a value pushes xcom
       return [3,4,5,6]
   
   def pull_xcom(ti):
       # Pull an xcom
       xcom = ti.xcom_pull(task_ids='push_xcom', key='mykey')
       print("Xcom pulled: ", xcom)
   
   
   with dag:
       task1 = PythonOperator(
           task_id="push_xcom",
           python_callable=push_xcom,
           xcom_key='mykey',
       )
       task2 = PythonOperator(
           task_id="pull_xcom",
           python_callable=pull_xcom
       )
       task1 >> task2
   ```
   
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   
   dag = DAG(dag_id="test-xcom-key", schedule_interval="@once", start_date=days_ago(2))
   
   @dag.task(xcom_key="mykey")
   def push_xcom():
       return {"mydata":[3,4,5,6]}
   
   @dag.task()
   def push_another_xcom(data):
       print(data)
       return {'mydata':"mydata"}
   
   
   push_another_xcom(push_xcom())
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875058937


   > FWIW that feels like a bug itself -- it shouldn't show up "twice" in xcom, which is especially important for custom xcom backends and larger data (i.e. DFs)
   
   Above is the current behaviour when using multiple outputs. With this change only this `key=return_value, value={'mykey':[3,4,56]}` is the output
   
   > For instance:
       ```
       @task
       def my_task():
             return some_large_df()
       ```
       That is very likely to be error prone to and hard to spot the error for anyone not intimately familiar with XComs -- the opposite direction we want to head in :)
   
   The above code, will output `some_large_df` as xcom value with key='return_value', if we use this change and say @task(xcom_key='somekey'), the only thing that would change is the xcom key. Xcom value will be `some_large_df` while key will be `somekey'.
   
   > So lets take an big step back: what feature/functionality are you trying to achieve?
   
   I want to be able to control the key used in pushing xcom when a value is returned from a task
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875098890


   > > I want to be able to control the key used in pushing xcom when a value is returned from a task
   > 
   > Step back further 😁 Why? Please explain what you are trying to achieve, not the implementation you have already thought about.
   
   😀😀
   Ok. For this case I described where I want to know in the UI if XCom was uploaded to GCS and if not see the value, by customising the `orm_deserialize_value`.
   
   Take for instance, I'm building a custom xcom backend where I want to have XComs stored in GCS if the value is large or store in db if the value is small. And I want to know in the UI when I visit the XCom tab that this XCOM is in db or in GCS.
   
   Is there another way to accomplish this other than to prefix the keys for xcoms stored in GCS with say `df' and do something like:
   ```
   def orm_deserialize_value(self):
       if self.key.starts_with('df'):
           return 'Xcom uploaded to GCS'
       return BaseXCom.deserialize_value(self)
   ```
   
   I don't have strong opinion on this feature though, we can close it if we don't want it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875023903


   Can you explain _why_ this is desirable behaviour?
   
   Doesn't returning a dictionary (and setting multiple outputs?) already set different xcom keys, or am I making that up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875041504


   One thing that one can do if he's able to control the xcom_key:
   
   In overriding the `orm_deserialize_value` we can do something like:
   
   ```
   def orm_deserialize_value(self):
       if self.key.starts_with('df'):
           return 'Xcom uploaded to GCS'
       return BaseXCom.deserialize_value(self)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875089706


   > I want to be able to control the key used in pushing xcom when a value is returned from a task
   
   Step back further 😁 Why? Please explain what you are trying to achieve, not the implementation you have already thought about.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875051253


   > Then Xcoms pushed will be
   > 
   > ```
   > key=mykey, value=[3,4,5,6]
   > key=return_value, value={'mykey':[3,4,56]}
   > ```
   
   FWIW that feels like a bug itself -- it shouldn't show up "twice" in xcom, which is especially important for custom xcom backends and larger data (i.e. DFs)
   
   
   > One thing that one can do if he's able to control the xcom_key:
   > 
   > In overriding the `orm_deserialize_value` we can do something like:
   > 
   > ```
   > def orm_deserialize_value(self):
   >     if self.key.starts_with('df'):
   >         return 'Xcom uploaded to GCS'
   >     return BaseXCom.deserialize_value(self)
   > ```
   
   I need convincing that a) this is the right thing to trigger this behaviour one, and b) that the interface you have proposed is right for users.
   
   For instance:
   
   ```python
   
   @task
   def my_task():
       return some_large_df()
   ```
   
   That is very likely to be error prone to and hard to spot the error for anyone not intimately familiar with XComs -- the opposite direction we want to head in :)
   
   So lets take an big step back: what feature/functionality are you trying to achieve?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ldacey commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ldacey commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-981147088


   @ephraimbuddy Can I overwrite the default "return_value" key in custom operators? 
   
   I have some historical custom keys for certain DAGs. For example, in one operator I push a list of files with the key `extract_files` and I also push the maximum ID from the database table which is a default `return_value`. Can I have my operator return the list of files (extract_files) as an xcom with the key `extract_files`? I want to make use of the `.output` feature.
   
   Not a huge deal because I could potentially just rename all of the old xcom keys in the database, and replace any references to `extract_files` key in my code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ldacey commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ldacey commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-981776479


   This worked for me.
   
   - extract.output can be used because the key is `return_value`
   - I added a template_field called `input_files` in my custom DatasetToDatasetOperator
   - I used XcomArg to overwrite the key from `return_value` to `filter_list`
   - The input_files argument reads the XComArg
   
   ```
       transform = PrepareParquetOperator(
           task_id="transform",
           input_files=extract.output,
       )
   
       transformed_files = XComArg(transform, "filter_list")
   
       finalize = DatasetToDatasetOperator(
           task_id="finalize",
           input_files=transformed_files,
       
       )
       extract >> transformed_files >> finalize
   ```
   
   It seems like I have to use `transformed_files` as my upstream task instead of `transform`. Everything works though.
   
   It _would_ be nice if we could just overwrite the `return_value` key with anything we want to without requiring intermediate tasks or XComArgs. I will most likely just replace all of my custom keys with `return_value` instead though in the database, and remove any of my custom `xcom.push` code in favor of just returning results though.
   
   Thanks @ephraimbuddy


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#discussion_r664822803



##########
File path: airflow/serialization/schema.json
##########
@@ -161,6 +161,7 @@
         "weight_rule": { "type": "string" },
         "executor_config": { "$ref": "#/definitions/dict" },
         "do_xcom_push": { "type": "boolean" },
+        "xcom_key": { "type": "string" },

Review comment:
       This attribute isn't needed for webserver or scheduler, so we should exclude it from the serialization. 

##########
File path: tests/models/test_taskinstance.py
##########
@@ -1163,6 +1163,33 @@ def test_xcom_push_flag(self):
         ti.run()
         assert ti.xcom_pull(task_ids=task_id, key=models.XCOM_RETURN_KEY) is None
 
+    def test_xcom_push_with_a_specified_key_in_operator(self):
+        """
+        Tests the option for Operators to push XComs with a given key
+        """
+        key = 'mykey'
+        value = 'hello'
+        task_id = 'test_no_xcom_push'
+        dag = models.DAG(dag_id='test_xcom')
+
+        # XCom pushed with key=mykey
+        task = PythonOperator(
+            task_id=task_id,
+            dag=dag,
+            python_callable=lambda: value,
+            xcom_key=key,

Review comment:
       I don't think this should a property of BaseOperator -- _if_ we have this feature (and I'm still not sure why we need it) then it should be solely on the TaskFlow decorator, not all operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875100326


   Also, I feel the way multiple_outputs work is wrong and this can be used instead of it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875058937


   > FWIW that feels like a bug itself -- it shouldn't show up "twice" in xcom, which is especially important for custom xcom backends and larger data (i.e. DFs)
   
   Above is the current behaviour when using multiple outputs. With this change only this `key=return_value, value={'mykey':[3,4,56]}` is the output
   
   > For instance:
   
       ```
       @task
       def my_task():
             return some_large_df()
       ```
   >  That is very likely to be error prone to and hard to spot the error for anyone not intimately familiar with XComs -- the opposite direction we want to head in :)
   
   The above code, will output `some_large_df` as xcom value with key='return_value', if we use this change and say @task(xcom_key='somekey'), the only thing that would change is the xcom key. Xcom value will be `some_large_df` while key will be `somekey'.
   
   > So lets take an big step back: what feature/functionality are you trying to achieve?
   
   I want to be able to control the key used in pushing xcom when a value is returned from a task
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-874271273


   Tested with these two dags:
   
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   from airflow.operators.python import PythonOperator
   
   dag =  DAG(
       "example_xcom",
       schedule_interval="@once",
       start_date=days_ago(2)
   ) 
   
   def push_xcom(ti):
       return [3,4,5,6]
   
   def pull_xcom(ti):
       xcom = ti.xcom_pull(task_ids='push_xcom', key='mykey')
       print("Xcom pulled: ", xcom)
   
   
   with dag:
       task1 = PythonOperator(
           task_id="push_xcom",
           python_callable=push_xcom,
           xcom_key='mykey',
       )
       task2 = PythonOperator(
           task_id="pull_xcom",
           python_callable=pull_xcom
       )
       task1 >> task2
   ```
   Another one:
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   
   dag = DAG(dag_id="test-xcom-key", schedule_interval="@once", start_date=days_ago(2))
   
   @dag.task(xcom_key="mykey")
   def push_xcom():
       return {"mydata":[3,4,5,6]}
   
   @dag.task()
   def push_another_xcom(data):
       print(data)
       return {'mydata':"mydata"}
   
   
   push_another_xcom(push_xcom())
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-874271273


   Tested with these two dags:
   
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   from airflow.operators.python import PythonOperator
   
   dag =  DAG(
       "example_xcom",
       schedule_interval="@once",
       start_date=days_ago(2)
   ) 
   
   def push_xcom(ti):
       # returning a value pushes xcom
       return [3,4,5,6]
   
   def pull_xcom(ti):
       # Pull an xcom
       xcom = ti.xcom_pull(task_ids='push_xcom', key='mykey')
       print("Xcom pulled: ", xcom)
   
   
   with dag:
       task1 = PythonOperator(
           task_id="push_xcom",
           python_callable=push_xcom,
           xcom_key='mykey',
       )
       task2 = PythonOperator(
           task_id="pull_xcom",
           python_callable=pull_xcom
       )
       task1 >> task2
   ```
   
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   
   dag = DAG(dag_id="test-xcom-key", schedule_interval="@once", start_date=days_ago(2))
   
   @dag.task(xcom_key="mykey")
   def push_xcom():
       return {"mydata":[3,4,5,6]}
   
   @dag.task()
   def push_another_xcom(data):
       print(data)
       return {'mydata':"mydata"}
   
   
   push_another_xcom(push_xcom())
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-875098890


   > > I want to be able to control the key used in pushing xcom when a value is returned from a task
   > 
   > Step back further 😁 Why? Please explain what you are trying to achieve, not the implementation you have already thought about.
   
   😀😀
   Ok. For this case I described where I want to know in the UI if XCom was uploaded to GCS and if not see the value, by customising the `orm_deserialize_value`.
   
   Take for instance, I'm building a custom xcom backend where I want to have XComs stored in GCS if the value is large or store in db if the value is small. And I want to know in the UI when I visit the XCom tab that this XCOM is in db or in GCS.
   
   Is there another way to accomplish this other than to prefix the keys for xcoms stored in GCS with say `df' and do something like:
   ```
   def orm_deserialize_value(self):
       if self.key.starts_with('df'):
           return 'Xcom uploaded to GCS'
       return BaseXCom.deserialize_value(self)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#discussion_r664837950



##########
File path: airflow/serialization/schema.json
##########
@@ -161,6 +161,7 @@
         "weight_rule": { "type": "string" },
         "executor_config": { "$ref": "#/definitions/dict" },
         "do_xcom_push": { "type": "boolean" },
+        "xcom_key": { "type": "string" },

Review comment:
       I think that we need it since it's an additional field in the base operator. It's actually a test failure that pointed me to it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16823: Add ability to specify XCom key for operators and task decorator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-981329153


   @Idacey, for custom operators, if you are pushing XComs with `ti.xcom_push`, you can specify a key. Then you can wrap your operator instance with XComArg specifying the key to pull. e.g
   ```python
   xcom_arg = XComArg(custom_op, key=`extract_files`)
   ```
   instead of 
   ```python
   xcom_arg = custom_op.output
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org