You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/24 19:33:51 UTC
[GitHub] marengaz closed pull request #4345: [AIRFLOW-3249] unify (and fix)
pushing result to xcom
marengaz closed pull request #4345: [AIRFLOW-3249] unify (and fix) pushing result to xcom
URL: https://github.com/apache/incubator-airflow/pull/4345
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/UPDATING.md b/UPDATING.md
index 575fc0a3c5..a82e656a9b 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -109,6 +109,12 @@ session.add(admin)
session.commit()
```
+### Unification of `do_xcom_push` flag
+The `do_xcom_push` flag (a switch to push the result of an operator to xcom or not) was appearing in different incarnations in different operators. It's function has been unified under a common name (`do_xcom_push`) on `BaseOperator`. This way it is also easy to globally disable pushing results to xcom.
+
+See [AIRFLOW-3249](https://jira.apache.org/jira/browse/AIRFLOW-3249) to check if your operator was affected.
+
+
## Airflow 1.10.1
### StatsD Metrics
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 9d95eadc74..2e6da4ada6 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -54,8 +54,6 @@ class DatastoreExportOperator(BaseOperator):
:param overwrite_existing: if the storage bucket + namespace is not empty, it will be
emptied prior to exports. This enables overwriting existing backups.
:type overwrite_existing: bool
- :param xcom_push: push operation name to xcom for reference
- :type xcom_push: bool
"""
@apply_defaults
@@ -69,7 +67,6 @@ def __init__(self,
labels=None,
polling_interval_in_seconds=10,
overwrite_existing=False,
- xcom_push=False,
*args,
**kwargs):
super(DatastoreExportOperator, self).__init__(*args, **kwargs)
@@ -82,7 +79,8 @@ def __init__(self,
self.labels = labels
self.polling_interval_in_seconds = polling_interval_in_seconds
self.overwrite_existing = overwrite_existing
- self.xcom_push = xcom_push
+ if kwargs.get('xcom_push') is not None:
+ raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
def execute(self, context):
self.log.info('Exporting data to Cloud Storage bucket ' + self.bucket)
@@ -106,5 +104,4 @@ def execute(self, context):
if state != 'SUCCESSFUL':
raise AirflowException('Operation failed: result={}'.format(result))
- if self.xcom_push:
- return result
+ return result
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index c79767f35e..b7897cf051 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -50,8 +50,6 @@ class DatastoreImportOperator(BaseOperator):
:param polling_interval_in_seconds: number of seconds to wait before polling for
execution status again
:type polling_interval_in_seconds: int
- :param xcom_push: push operation name to xcom for reference
- :type xcom_push: bool
"""
@apply_defaults
@@ -64,7 +62,6 @@ def __init__(self,
datastore_conn_id='google_cloud_default',
delegate_to=None,
polling_interval_in_seconds=10,
- xcom_push=False,
*args,
**kwargs):
super(DatastoreImportOperator, self).__init__(*args, **kwargs)
@@ -76,7 +73,8 @@ def __init__(self,
self.entity_filter = entity_filter
self.labels = labels
self.polling_interval_in_seconds = polling_interval_in_seconds
- self.xcom_push = xcom_push
+ if kwargs.get('xcom_push') is not None:
+ raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
def execute(self, context):
self.log.info('Importing data from Cloud Storage bucket %s', self.bucket)
@@ -94,5 +92,4 @@ def execute(self, context):
if state != 'SUCCESSFUL':
raise AirflowException('Operation failed: result={}'.format(result))
- if self.xcom_push:
- return result
+ return result
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index f6c1f9d45e..3347f94e1b 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -80,10 +80,10 @@ class KubernetesPodOperator(BaseOperator):
:type node_selectors: dict
:param config_file: The path to the Kubernetes config file
:type config_file: str
- :param xcom_push: If xcom_push is True, the content of the file
+ :param do_xcom_push: If True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
- :type xcom_push: bool
+ :type do_xcom_push: bool
:param tolerations: Kubernetes tolerations
:type list of tolerations
"""
@@ -123,7 +123,7 @@ def execute(self, context):
pod.tolerations = self.tolerations
launcher = pod_launcher.PodLauncher(kube_client=client,
- extract_xcom=self.xcom_push)
+ extract_xcom=self.do_xcom_push)
try:
(final_state, result) = launcher.run_pod(
pod,
@@ -137,8 +137,8 @@ def execute(self, context):
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state)
)
- if self.xcom_push:
- return result
+
+ return result
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
@@ -163,7 +163,7 @@ def __init__(self,
resources=None,
affinity=None,
config_file=None,
- xcom_push=False,
+ do_xcom_push=False,
node_selectors=None,
image_pull_secrets=None,
service_account_name="default",
@@ -191,7 +191,10 @@ def __init__(self,
self.node_selectors = node_selectors or {}
self.annotations = annotations or {}
self.affinity = affinity or {}
- self.xcom_push = xcom_push
+ self.do_xcom_push = do_xcom_push
+ if kwargs.get('xcom_push') is not None:
+ raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
+
self.resources = resources or Resources()
self.config_file = config_file
self.image_pull_secrets = image_pull_secrets
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index 2bf342935d..3e740bf1c8 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -45,8 +45,6 @@ class SSHOperator(BaseOperator):
:type command: str
:param timeout: timeout (in seconds) for executing the command.
:type timeout: int
- :param do_xcom_push: return the stdout which also get set in xcom by airflow platform
- :type do_xcom_push: bool
"""
template_fields = ('command', 'remote_host')
@@ -59,7 +57,6 @@ def __init__(self,
remote_host=None,
command=None,
timeout=10,
- do_xcom_push=False,
*args,
**kwargs):
super(SSHOperator, self).__init__(*args, **kwargs)
@@ -68,7 +65,6 @@ def __init__(self,
self.remote_host = remote_host
self.command = command
self.timeout = timeout
- self.do_xcom_push = do_xcom_push
def execute(self, context):
try:
@@ -148,15 +144,13 @@ def execute(self, context):
exit_status = stdout.channel.recv_exit_status()
if exit_status is 0:
- # returning output if do_xcom_push is set
- if self.do_xcom_push:
- enable_pickling = configuration.conf.getboolean(
- 'core', 'enable_xcom_pickling'
- )
- if enable_pickling:
- return agg_stdout
- else:
- return b64encode(agg_stdout).decode('utf-8')
+ enable_pickling = configuration.conf.getboolean(
+ 'core', 'enable_xcom_pickling'
+ )
+ if enable_pickling:
+ return agg_stdout
+ else:
+ return b64encode(agg_stdout).decode('utf-8')
else:
error_msg = agg_stderr.decode('utf-8')
diff --git a/airflow/contrib/operators/winrm_operator.py b/airflow/contrib/operators/winrm_operator.py
index c81acac44f..5788ea5bea 100644
--- a/airflow/contrib/operators/winrm_operator.py
+++ b/airflow/contrib/operators/winrm_operator.py
@@ -48,8 +48,6 @@ class WinRMOperator(BaseOperator):
:type command: str
:param timeout: timeout for executing the command.
:type timeout: int
- :param do_xcom_push: return the stdout which also get set in xcom by airflow platform
- :type do_xcom_push: bool
"""
template_fields = ('command',)
@@ -60,7 +58,6 @@ def __init__(self,
remote_host=None,
command=None,
timeout=10,
- do_xcom_push=False,
*args,
**kwargs):
super(WinRMOperator, self).__init__(*args, **kwargs)
@@ -69,7 +66,6 @@ def __init__(self,
self.remote_host = remote_host
self.command = command
self.timeout = timeout
- self.do_xcom_push = do_xcom_push
def execute(self, context):
if self.ssh_conn_id and not self.winrm_hook:
@@ -128,14 +124,13 @@ def execute(self, context):
if return_code is 0:
# returning output if do_xcom_push is set
- if self.do_xcom_push:
- enable_pickling = configuration.conf.getboolean(
- 'core', 'enable_xcom_pickling'
- )
- if enable_pickling:
- return stdout_buffer
- else:
- return b64encode(b''.join(stdout_buffer)).decode('utf-8')
+ enable_pickling = configuration.conf.getboolean(
+ 'core', 'enable_xcom_pickling'
+ )
+ if enable_pickling:
+ return stdout_buffer
+ else:
+ return b64encode(b''.join(stdout_buffer)).decode('utf-8')
else:
error_msg = "Error running cmd: {0}, return code: {1}, error: {2}".format(
self.command,
@@ -143,7 +138,3 @@ def execute(self, context):
b''.join(stderr_buffer).decode('utf-8')
)
raise AirflowException(error_msg)
-
- self.log.info("Finished!")
-
- return True
diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py
index b037df9c79..6ea940e0dd 100644
--- a/airflow/example_dags/docker_copy_data.py
+++ b/airflow/example_dags/docker_copy_data.py
@@ -58,7 +58,7 @@
# t_view = BashOperator(
# task_id='view_file',
# bash_command=locate_file_cmd,
-# xcom_push=True,
+# do_xcom_push=True,
# params={'source_location': '/your/input_dir/path'},
# dag=dag)
#
@@ -84,7 +84,7 @@
# '/your/host/output_dir/path:/your/output_dir/path'],
# command='./entrypoint.sh',
# task_id='move_data',
-# xcom_push=True,
+# do_xcom_push=True,
# params={'source_location': '/your/input_dir/path',
# 'target_location': '/your/output_dir/path'},
# dag=dag)
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 13aa44fc85..11923edbd8 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -39,9 +39,6 @@ class BashOperator(BaseOperator):
:param bash_command: The command, set of commands or reference to a
bash script (must be '.sh') to be executed. (templated)
:type bash_command: str
- :param xcom_push: If xcom_push is True, the last line written to stdout
- will also be pushed to an XCom when the bash command completes.
- :type xcom_push: bool
:param env: If env is not None, it must be a mapping that defines the
environment variables for the new process; these are used instead
of inheriting the current process environment, which is the default
@@ -69,7 +66,6 @@ class BashOperator(BaseOperator):
def __init__(
self,
bash_command,
- xcom_push=False,
env=None,
output_encoding='utf-8',
*args, **kwargs):
@@ -77,8 +73,9 @@ def __init__(
super(BashOperator, self).__init__(*args, **kwargs)
self.bash_command = bash_command
self.env = env
- self.xcom_push_flag = xcom_push
self.output_encoding = output_encoding
+ if kwargs.get('xcom_push') is not None:
+ raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
def execute(self, context):
"""
@@ -142,8 +139,7 @@ def pre_exec():
if sp.returncode:
raise AirflowException("Bash command failed")
- if self.xcom_push_flag:
- return line
+ return line
def on_kill(self):
self.log.info('Sending SIGTERM signal to bash process group')
diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index cf9838593f..f063cda798 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -100,9 +100,6 @@ class DockerOperator(BaseOperator):
:param working_dir: Working directory to
set on the container (equivalent to the -w switch the docker client)
:type working_dir: str
- :param xcom_push: Does the stdout will be pushed to the next step using XCom.
- The default is False.
- :type xcom_push: bool
:param xcom_all: Push all the stdout or just the last line.
The default is False (last line).
:type xcom_all: bool
@@ -136,7 +133,6 @@ def __init__(
user=None,
volumes=None,
working_dir=None,
- xcom_push=False,
xcom_all=False,
docker_conn_id=None,
dns=None,
@@ -168,10 +164,11 @@ def __init__(
self.user = user
self.volumes = volumes or []
self.working_dir = working_dir
- self.xcom_push_flag = xcom_push
self.xcom_all = xcom_all
self.docker_conn_id = docker_conn_id
self.shm_size = shm_size
+ if kwargs.get('xcom_push') is not None:
+ raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
self.cli = None
self.container = None
@@ -238,9 +235,8 @@ def execute(self, context):
if result['StatusCode'] != 0:
raise AirflowException('docker container failed: ' + repr(result))
- if self.xcom_push_flag:
- return self.cli.logs(container=self.container['Id']) \
- if self.xcom_all else str(line)
+ return self.cli.logs(container=self.container['Id']) \
+ if self.xcom_all else line.encode('utf-8')
def get_command(self):
if self.command is not None and self.command.strip().find('[') == 0:
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index d26743f6f1..3170857f9f 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -46,10 +46,6 @@ class SimpleHttpOperator(BaseOperator):
'requests' documentation (options to modify timeout, ssl, etc.)
:type extra_options: A dictionary of options, where key is string and value
depends on the option that's being modified.
- :param xcom_push: Push the response to Xcom (default: False).
- If xcom_push is True, response of an HTTP request will also
- be pushed to an XCom.
- :type xcom_push: bool
:param log_response: Log the response (default: False)
:type log_response: bool
"""
@@ -66,7 +62,6 @@ def __init__(self,
headers=None,
response_check=None,
extra_options=None,
- xcom_push=False,
http_conn_id='http_default',
log_response=False,
*args, **kwargs):
@@ -78,8 +73,9 @@ def __init__(self,
self.data = data or {}
self.response_check = response_check
self.extra_options = extra_options or {}
- self.xcom_push_flag = xcom_push
self.log_response = log_response
+ if kwargs.get('xcom_push') is not None:
+ raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
def execute(self, context):
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
@@ -93,7 +89,6 @@ def execute(self, context):
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
- if self.xcom_push_flag:
- return response.text
if self.log_response:
self.log.info(response.text)
+ return response.text
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index e1dfce28c4..72150b02fa 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -322,7 +322,7 @@ def test_xcom_push(self):
labels={"foo": "bar"},
name="test",
task_id="task",
- xcom_push=True
+ do_xcom_push=True
)
self.assertEqual(k.execute(None), json.loads(return_value))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services