You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/24 19:33:51 UTC

[GitHub] marengaz closed pull request #4345: [AIRFLOW-3249] unify (and fix) pushing result to xcom

marengaz closed pull request #4345: [AIRFLOW-3249] unify (and fix) pushing result to xcom
URL: https://github.com/apache/incubator-airflow/pull/4345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/UPDATING.md b/UPDATING.md
index 575fc0a3c5..a82e656a9b 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -109,6 +109,12 @@ session.add(admin)
 session.commit()
 ```
 
+### Unification of `do_xcom_push` flag
+The `do_xcom_push` flag (a switch to push the result of an operator to xcom or not) was appearing in different incarnations in different operators. It's function has been unified under a common name (`do_xcom_push`) on `BaseOperator`. This way it is also easy to globally disable pushing results to xcom.
+
+See [AIRFLOW-3249](https://jira.apache.org/jira/browse/AIRFLOW-3249) to check if your operator was affected.
+
+
 ## Airflow 1.10.1
 
 ### StatsD Metrics
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 9d95eadc74..2e6da4ada6 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -54,8 +54,6 @@ class DatastoreExportOperator(BaseOperator):
     :param overwrite_existing: if the storage bucket + namespace is not empty, it will be
         emptied prior to exports. This enables overwriting existing backups.
     :type overwrite_existing: bool
-    :param xcom_push: push operation name to xcom for reference
-    :type xcom_push: bool
     """
 
     @apply_defaults
@@ -69,7 +67,6 @@ def __init__(self,
                  labels=None,
                  polling_interval_in_seconds=10,
                  overwrite_existing=False,
-                 xcom_push=False,
                  *args,
                  **kwargs):
         super(DatastoreExportOperator, self).__init__(*args, **kwargs)
@@ -82,7 +79,8 @@ def __init__(self,
         self.labels = labels
         self.polling_interval_in_seconds = polling_interval_in_seconds
         self.overwrite_existing = overwrite_existing
-        self.xcom_push = xcom_push
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
 
     def execute(self, context):
         self.log.info('Exporting data to Cloud Storage bucket ' + self.bucket)
@@ -106,5 +104,4 @@ def execute(self, context):
         if state != 'SUCCESSFUL':
             raise AirflowException('Operation failed: result={}'.format(result))
 
-        if self.xcom_push:
-            return result
+        return result
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index c79767f35e..b7897cf051 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -50,8 +50,6 @@ class DatastoreImportOperator(BaseOperator):
     :param polling_interval_in_seconds: number of seconds to wait before polling for
         execution status again
     :type polling_interval_in_seconds: int
-    :param xcom_push: push operation name to xcom for reference
-    :type xcom_push: bool
     """
 
     @apply_defaults
@@ -64,7 +62,6 @@ def __init__(self,
                  datastore_conn_id='google_cloud_default',
                  delegate_to=None,
                  polling_interval_in_seconds=10,
-                 xcom_push=False,
                  *args,
                  **kwargs):
         super(DatastoreImportOperator, self).__init__(*args, **kwargs)
@@ -76,7 +73,8 @@ def __init__(self,
         self.entity_filter = entity_filter
         self.labels = labels
         self.polling_interval_in_seconds = polling_interval_in_seconds
-        self.xcom_push = xcom_push
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
 
     def execute(self, context):
         self.log.info('Importing data from Cloud Storage bucket %s', self.bucket)
@@ -94,5 +92,4 @@ def execute(self, context):
         if state != 'SUCCESSFUL':
             raise AirflowException('Operation failed: result={}'.format(result))
 
-        if self.xcom_push:
-            return result
+        return result
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index f6c1f9d45e..3347f94e1b 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -80,10 +80,10 @@ class KubernetesPodOperator(BaseOperator):
     :type node_selectors: dict
     :param config_file: The path to the Kubernetes config file
     :type config_file: str
-    :param xcom_push: If xcom_push is True, the content of the file
+    :param do_xcom_push: If True, the content of the file
         /airflow/xcom/return.json in the container will also be pushed to an
         XCom when the container completes.
-    :type xcom_push: bool
+    :type do_xcom_push: bool
     :param tolerations: Kubernetes tolerations
     :type list of tolerations
     """
@@ -123,7 +123,7 @@ def execute(self, context):
             pod.tolerations = self.tolerations
 
             launcher = pod_launcher.PodLauncher(kube_client=client,
-                                                extract_xcom=self.xcom_push)
+                                                extract_xcom=self.do_xcom_push)
             try:
                 (final_state, result) = launcher.run_pod(
                     pod,
@@ -137,8 +137,8 @@ def execute(self, context):
                 raise AirflowException(
                     'Pod returned a failure: {state}'.format(state=final_state)
                 )
-            if self.xcom_push:
-                return result
+
+            return result
         except AirflowException as ex:
             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
 
@@ -163,7 +163,7 @@ def __init__(self,
                  resources=None,
                  affinity=None,
                  config_file=None,
-                 xcom_push=False,
+                 do_xcom_push=False,
                  node_selectors=None,
                  image_pull_secrets=None,
                  service_account_name="default",
@@ -191,7 +191,10 @@ def __init__(self,
         self.node_selectors = node_selectors or {}
         self.annotations = annotations or {}
         self.affinity = affinity or {}
-        self.xcom_push = xcom_push
+        self.do_xcom_push = do_xcom_push
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
+
         self.resources = resources or Resources()
         self.config_file = config_file
         self.image_pull_secrets = image_pull_secrets
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index 2bf342935d..3e740bf1c8 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -45,8 +45,6 @@ class SSHOperator(BaseOperator):
     :type command: str
     :param timeout: timeout (in seconds) for executing the command.
     :type timeout: int
-    :param do_xcom_push: return the stdout which also get set in xcom by airflow platform
-    :type do_xcom_push: bool
     """
 
     template_fields = ('command', 'remote_host')
@@ -59,7 +57,6 @@ def __init__(self,
                  remote_host=None,
                  command=None,
                  timeout=10,
-                 do_xcom_push=False,
                  *args,
                  **kwargs):
         super(SSHOperator, self).__init__(*args, **kwargs)
@@ -68,7 +65,6 @@ def __init__(self,
         self.remote_host = remote_host
         self.command = command
         self.timeout = timeout
-        self.do_xcom_push = do_xcom_push
 
     def execute(self, context):
         try:
@@ -148,15 +144,13 @@ def execute(self, context):
 
                 exit_status = stdout.channel.recv_exit_status()
                 if exit_status is 0:
-                    # returning output if do_xcom_push is set
-                    if self.do_xcom_push:
-                        enable_pickling = configuration.conf.getboolean(
-                            'core', 'enable_xcom_pickling'
-                        )
-                        if enable_pickling:
-                            return agg_stdout
-                        else:
-                            return b64encode(agg_stdout).decode('utf-8')
+                    enable_pickling = configuration.conf.getboolean(
+                        'core', 'enable_xcom_pickling'
+                    )
+                    if enable_pickling:
+                        return agg_stdout
+                    else:
+                        return b64encode(agg_stdout).decode('utf-8')
 
                 else:
                     error_msg = agg_stderr.decode('utf-8')
diff --git a/airflow/contrib/operators/winrm_operator.py b/airflow/contrib/operators/winrm_operator.py
index c81acac44f..5788ea5bea 100644
--- a/airflow/contrib/operators/winrm_operator.py
+++ b/airflow/contrib/operators/winrm_operator.py
@@ -48,8 +48,6 @@ class WinRMOperator(BaseOperator):
     :type command: str
     :param timeout: timeout for executing the command.
     :type timeout: int
-    :param do_xcom_push: return the stdout which also get set in xcom by airflow platform
-    :type do_xcom_push: bool
     """
     template_fields = ('command',)
 
@@ -60,7 +58,6 @@ def __init__(self,
                  remote_host=None,
                  command=None,
                  timeout=10,
-                 do_xcom_push=False,
                  *args,
                  **kwargs):
         super(WinRMOperator, self).__init__(*args, **kwargs)
@@ -69,7 +66,6 @@ def __init__(self,
         self.remote_host = remote_host
         self.command = command
         self.timeout = timeout
-        self.do_xcom_push = do_xcom_push
 
     def execute(self, context):
         if self.ssh_conn_id and not self.winrm_hook:
@@ -128,14 +124,13 @@ def execute(self, context):
 
         if return_code is 0:
             # returning output if do_xcom_push is set
-            if self.do_xcom_push:
-                enable_pickling = configuration.conf.getboolean(
-                    'core', 'enable_xcom_pickling'
-                )
-                if enable_pickling:
-                    return stdout_buffer
-                else:
-                    return b64encode(b''.join(stdout_buffer)).decode('utf-8')
+            enable_pickling = configuration.conf.getboolean(
+                'core', 'enable_xcom_pickling'
+            )
+            if enable_pickling:
+                return stdout_buffer
+            else:
+                return b64encode(b''.join(stdout_buffer)).decode('utf-8')
         else:
             error_msg = "Error running cmd: {0}, return code: {1}, error: {2}".format(
                 self.command,
@@ -143,7 +138,3 @@ def execute(self, context):
                 b''.join(stderr_buffer).decode('utf-8')
             )
             raise AirflowException(error_msg)
-
-        self.log.info("Finished!")
-
-        return True
diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py
index b037df9c79..6ea940e0dd 100644
--- a/airflow/example_dags/docker_copy_data.py
+++ b/airflow/example_dags/docker_copy_data.py
@@ -58,7 +58,7 @@
 # t_view = BashOperator(
 #         task_id='view_file',
 #         bash_command=locate_file_cmd,
-#         xcom_push=True,
+#         do_xcom_push=True,
 #         params={'source_location': '/your/input_dir/path'},
 #         dag=dag)
 #
@@ -84,7 +84,7 @@
 #                  '/your/host/output_dir/path:/your/output_dir/path'],
 #         command='./entrypoint.sh',
 #         task_id='move_data',
-#         xcom_push=True,
+#         do_xcom_push=True,
 #         params={'source_location': '/your/input_dir/path',
 #                 'target_location': '/your/output_dir/path'},
 #         dag=dag)
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 13aa44fc85..11923edbd8 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -39,9 +39,6 @@ class BashOperator(BaseOperator):
     :param bash_command: The command, set of commands or reference to a
         bash script (must be '.sh') to be executed. (templated)
     :type bash_command: str
-    :param xcom_push: If xcom_push is True, the last line written to stdout
-        will also be pushed to an XCom when the bash command completes.
-    :type xcom_push: bool
     :param env: If env is not None, it must be a mapping that defines the
         environment variables for the new process; these are used instead
         of inheriting the current process environment, which is the default
@@ -69,7 +66,6 @@ class BashOperator(BaseOperator):
     def __init__(
             self,
             bash_command,
-            xcom_push=False,
             env=None,
             output_encoding='utf-8',
             *args, **kwargs):
@@ -77,8 +73,9 @@ def __init__(
         super(BashOperator, self).__init__(*args, **kwargs)
         self.bash_command = bash_command
         self.env = env
-        self.xcom_push_flag = xcom_push
         self.output_encoding = output_encoding
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
 
     def execute(self, context):
         """
@@ -142,8 +139,7 @@ def pre_exec():
                 if sp.returncode:
                     raise AirflowException("Bash command failed")
 
-        if self.xcom_push_flag:
-            return line
+        return line
 
     def on_kill(self):
         self.log.info('Sending SIGTERM signal to bash process group')
diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index cf9838593f..f063cda798 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -100,9 +100,6 @@ class DockerOperator(BaseOperator):
     :param working_dir: Working directory to
         set on the container (equivalent to the -w switch the docker client)
     :type working_dir: str
-    :param xcom_push: Does the stdout will be pushed to the next step using XCom.
-        The default is False.
-    :type xcom_push: bool
     :param xcom_all: Push all the stdout or just the last line.
         The default is False (last line).
     :type xcom_all: bool
@@ -136,7 +133,6 @@ def __init__(
             user=None,
             volumes=None,
             working_dir=None,
-            xcom_push=False,
             xcom_all=False,
             docker_conn_id=None,
             dns=None,
@@ -168,10 +164,11 @@ def __init__(
         self.user = user
         self.volumes = volumes or []
         self.working_dir = working_dir
-        self.xcom_push_flag = xcom_push
         self.xcom_all = xcom_all
         self.docker_conn_id = docker_conn_id
         self.shm_size = shm_size
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
 
         self.cli = None
         self.container = None
@@ -238,9 +235,8 @@ def execute(self, context):
             if result['StatusCode'] != 0:
                 raise AirflowException('docker container failed: ' + repr(result))
 
-            if self.xcom_push_flag:
-                return self.cli.logs(container=self.container['Id']) \
-                    if self.xcom_all else str(line)
+            return self.cli.logs(container=self.container['Id']) \
+                if self.xcom_all else line.encode('utf-8')
 
     def get_command(self):
         if self.command is not None and self.command.strip().find('[') == 0:
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index d26743f6f1..3170857f9f 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -46,10 +46,6 @@ class SimpleHttpOperator(BaseOperator):
         'requests' documentation (options to modify timeout, ssl, etc.)
     :type extra_options: A dictionary of options, where key is string and value
         depends on the option that's being modified.
-    :param xcom_push: Push the response to Xcom (default: False).
-        If xcom_push is True, response of an HTTP request will also
-        be pushed to an XCom.
-    :type xcom_push: bool
     :param log_response: Log the response (default: False)
     :type log_response: bool
     """
@@ -66,7 +62,6 @@ def __init__(self,
                  headers=None,
                  response_check=None,
                  extra_options=None,
-                 xcom_push=False,
                  http_conn_id='http_default',
                  log_response=False,
                  *args, **kwargs):
@@ -78,8 +73,9 @@ def __init__(self,
         self.data = data or {}
         self.response_check = response_check
         self.extra_options = extra_options or {}
-        self.xcom_push_flag = xcom_push
         self.log_response = log_response
+        if kwargs.get('xcom_push') is not None:
+            raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
 
     def execute(self, context):
         http = HttpHook(self.method, http_conn_id=self.http_conn_id)
@@ -93,7 +89,6 @@ def execute(self, context):
         if self.response_check:
             if not self.response_check(response):
                 raise AirflowException("Response check returned False.")
-        if self.xcom_push_flag:
-            return response.text
         if self.log_response:
             self.log.info(response.text)
+        return response.text
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index e1dfce28c4..72150b02fa 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -322,7 +322,7 @@ def test_xcom_push(self):
             labels={"foo": "bar"},
             name="test",
             task_id="task",
-            xcom_push=True
+            do_xcom_push=True
         )
         self.assertEqual(k.execute(None), json.loads(return_value))
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services