You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/07 19:48:04 UTC

[GitHub] jlowin closed pull request #2092: [AIRFLOW-888] Don't automatically push XComs

jlowin closed pull request #2092: [AIRFLOW-888] Don't automatically push XComs
URL: https://github.com/apache/incubator-airflow/pull/2092
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/UPDATING.md b/UPDATING.md
index 6fd7afbe5c..e0acc49a4c 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -7,18 +7,19 @@ assists people when migrating to a new version.
 
 ### New Features
 
-#### Dask Executor
-
-A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters.
+- [AIRFLOW-862] A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters
 
 ### Deprecated Features
 These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
-supported and will be removed entirely in Airflow 2.0
+supported and will be removed entirely in a future version of Airflow.
 
-- `post_execute()` hooks now take two arguments, `context` and `result`
-  (AIRFLOW-886)
+- [AIRFLOW-886] `post_execute()` hooks now take two arguments, `context` and `result`. Previously, post_execute() only took one argument, `context`.
 
-  Previously, post_execute() only took one argument, `context`.
+### Breaking Changes
+These changes are not backwards-compatible with previous versions of Airflow.
+
+- [AIRFLOW-888] Operators no longer automatically push XComs. This behavior can be reenabled globally
+  by setting `auto_xcom_push = True` in the `operators` setting of Airflow.cfg or on a per-Operator basis by passing `auto_xcom_push=True` when creating the Operator.
 
 ## Airflow 1.8
 
@@ -47,8 +48,8 @@ interfere.
 Please read through these options, defaults have changed since 1.7.1.
 
 #### child_process_log_directory
-In order the increase the robustness of the scheduler, DAGS our now processed in their own process. Therefore each 
-DAG has its own log file for the scheduler. These are placed in `child_process_log_directory` which defaults to 
+In order the increase the robustness of the scheduler, DAGS our now processed in their own process. Therefore each
+DAG has its own log file for the scheduler. These are placed in `child_process_log_directory` which defaults to
 `<AIRFLOW_HOME>/scheduler/latest`. You will need to make sure these log files are removed.
 
 > DAG logs or processor logs ignore and command line settings for log file locations.
@@ -58,7 +59,7 @@ Previously the command line option `num_runs` was used to let the scheduler term
 loops. This is now time bound and defaults to `-1`, which means run continuously. See also num_runs.
 
 #### num_runs
-Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies 
+Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies
 the number of times to try to schedule each DAG file within `run_duration` time. Defaults to `-1`, which means try
 indefinitely. This is only available on the command line.
 
@@ -71,7 +72,7 @@ dags are not being picked up, have a look at this number and decrease it when ne
 
 #### catchup_by_default
 By default the scheduler will fill any missing interval DAG Runs between the last execution date and the current date.
-This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as 
+This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as
 `catchup = False / True`. Command line backfills will still work.
 
 ### Faulty Dags do not show an error in the Web UI
@@ -95,33 +96,33 @@ convenience variables to the config. In case your run a sceure Hadoop setup it m
 required to whitelist these variables by adding the following to your configuration:
 
 ```
-<property> 
+<property>
      <name>hive.security.authorization.sqlstd.confwhitelist.append</name>
      <value>airflow\.ctx\..*</value>
 </property>
 ```
 ### Google Cloud Operator and Hook alignment
 
-All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection 
+All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection
 type for all kinds of Google Cloud Operators.
 
 If you experience problems connecting with your operator make sure you set the connection type "Google Cloud Platform".
 
-Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service 
+Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service
 account.
 
 ### Deprecated Features
-These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer 
+These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
 supported and will be removed entirely in Airflow 2.0
 
 - Hooks and operators must be imported from their respective submodules
 
-  `airflow.operators.PigOperator` is no longer supported; `from airflow.operators.pig_operator import PigOperator` is. 
+  `airflow.operators.PigOperator` is no longer supported; `from airflow.operators.pig_operator import PigOperator` is.
   (AIRFLOW-31, AIRFLOW-200)
 
 - Operators no longer accept arbitrary arguments
 
-  Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without 
+  Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without
   complaint. Now, invalid arguments will be rejected. (https://github.com/apache/incubator-airflow/pull/1285)
 
 ### Known Issues
diff --git a/airflow/configuration.py b/airflow/configuration.py
index fb3c11ef38..5844e55da2 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -180,6 +180,7 @@ def run_command(command):
 # values at runtime)
 unit_test_mode = False
 
+
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the
@@ -187,10 +188,12 @@ def run_command(command):
 api_client = airflow.api.client.local_client
 endpoint_url = http://localhost:8080
 
+
 [api]
 # How to authenticate users of the API
 auth_backend = airflow.api.auth.backend.default
 
+
 [operators]
 # The default owner assigned to each new operator, unless
 # provided explicitly or passed via `default_args`
@@ -200,6 +203,11 @@ def run_command(command):
 default_disk = 512
 default_gpus = 0
 
+# Operators should automatically push XComs containing their results. This sets
+# the default value for a parameter with the same name than can be passed to
+# individual Operators.
+auto_xcom_push = False
+
 
 [webserver]
 # The base url of your website as airflow cannot guess what domain or
@@ -280,6 +288,7 @@ def run_command(command):
 # DAGs by default
 hide_paused_dags_by_default = False
 
+
 [email]
 email_backend = airflow.utils.email.send_email_smtp
 
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index b41421bc0b..0aa0047b0b 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -36,6 +36,7 @@ def push(**kwargs):
     kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
 
 
+# this requires changing the default configuration
 def push_by_returning(**kwargs):
     # pushes an XCom without a specific target, just by returning it
     return value_2
@@ -61,7 +62,10 @@ def puller(**kwargs):
     task_id='push', dag=dag, python_callable=push)
 
 push2 = PythonOperator(
-    task_id='push_by_returning', dag=dag, python_callable=push_by_returning)
+    task_id='push_by_returning',
+    dag=dag,
+    python_callable=push_by_returning,
+    auto_xcom_push=True)
 
 pull = PythonOperator(
     task_id='puller', dag=dag, python_callable=puller)
diff --git a/airflow/models.py b/airflow/models.py
index 27a567029e..83654fdc0f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1374,7 +1374,7 @@ def signal_handler(signum, frame):
                     result = task_copy.execute(context=context)
 
                 # If the task returns a result, push an XCom containing it
-                if result is not None:
+                if result is not None and task_copy.auto_xcom_push:
                     self.xcom_push(key=XCOM_RETURN_KEY, value=result)
 
                 # TODO remove deprecated behavior in Airflow 2.0
@@ -1886,6 +1886,10 @@ class derived from this one results in the creation of a task object,
     :type resources: dict
     :param run_as_user: unix username to impersonate while running the task
     :type run_as_user: str
+    :param auto_xcom_push: if True, an XCom is automatically pushed containing
+        the Operator's result. The default value for this parameter is taken
+        from airflow.cfg.
+    :type auto_xcom_push: bool
     """
 
     # For derived classes to define which fields will get jinjaified
@@ -1928,6 +1932,7 @@ def __init__(
             trigger_rule=TriggerRule.ALL_SUCCESS,
             resources=None,
             run_as_user=None,
+            auto_xcom_push=None,
             *args,
             **kwargs):
 
@@ -1992,6 +1997,10 @@ def __init__(
         self.priority_weight = priority_weight
         self.resources = Resources(**(resources or {}))
         self.run_as_user = run_as_user
+        if auto_xcom_push is None:
+            auto_xcom_push = configuration.getboolean(
+                'operators', 'auto_xcom_push')
+        self.auto_xcom_push = auto_xcom_push
 
         # Private attributes
         self._upstream_task_ids = []
@@ -2022,6 +2031,7 @@ def __init__(
             'on_failure_callback',
             'on_success_callback',
             'on_retry_callback',
+            'auto_xcom_push',
         }
 
     def __eq__(self, other):
diff --git a/tests/models.py b/tests/models.py
index 6fbbf3ea8b..6470864bb2 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -23,7 +23,7 @@
 import unittest
 import time
 
-from airflow import models, settings, AirflowException
+from airflow import configuration, models, settings, AirflowException
 from airflow.exceptions import AirflowSkipException
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
@@ -825,3 +825,41 @@ def post_execute(self, context, result):
 
         with self.assertRaises(TestError):
             ti.run()
+
+    def test_auto_xcom_push(self):
+        """
+        Tests that Operators do not automatically push XComs
+        """
+        value = 'hello'
+
+        dag = models.DAG(dag_id='test_xcom')
+        task = PythonOperator(
+            task_id='test_no_auto_xcom_push',
+            dag=dag,
+            python_callable=lambda: value,
+            owner='airflow',
+            start_date=datetime.datetime(2017, 1, 1))
+        ti = TI(task=task, execution_date=datetime.datetime(2017, 1, 1))
+        ti.run()
+
+        # no XCom pushed by default
+        self.assertEqual(
+            ti.xcom_pull(
+                task_ids='test_no_auto_xcom_push', key=models.XCOM_RETURN_KEY),
+            None)
+
+        task2 = PythonOperator(
+            task_id='test_auto_xcom_push',
+            dag=dag,
+            python_callable=lambda: value,
+            auto_xcom_push=True,
+            owner='airflow',
+            start_date=datetime.datetime(2017, 1, 1))
+        ti = TI(task=task2, execution_date=datetime.datetime(2017, 1, 2))
+        ti.run()
+
+        # now an XCom should have been pushed
+        self.assertEqual(
+            ti.xcom_pull(
+                task_ids='test_auto_xcom_push', key=models.XCOM_RETURN_KEY),
+            value)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services