You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/19 17:56:31 UTC
incubator-airflow git commit: [AIRFLOW-1950] Optionally pass
xcom_pull task_ids
Repository: incubator-airflow
Updated Branches:
refs/heads/master 1e36b37b6 -> e7c118da2
[AIRFLOW-1950] Optionally pass xcom_pull task_ids
Changes the `task_ids` parameter of xcom_pull from
required to optional.
This parameter has always allowed None to be
passed, but since it's a
required parameter, it must be specified as such.
With this change, we're no longer forced to pass
it.
Closes #2902 from bcb/make-xcom-pull-task-ids-
optional
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e7c118da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e7c118da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e7c118da
Branch: refs/heads/master
Commit: e7c118da2229f508d1f7c9423488aa5df96a2081
Parents: 1e36b37
Author: Beau Barker <be...@gmail.com>
Authored: Fri Jan 19 18:56:16 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 19 18:56:16 2018 +0100
----------------------------------------------------------------------
airflow/models.py | 4 ++--
tests/models.py | 37 +++++++++++++++++++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e7c118da/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 5de18b2..edb3b67 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1828,7 +1828,7 @@ class TaskInstance(Base, LoggingMixin):
def xcom_pull(
self,
- task_ids,
+ task_ids=None,
dag_id=None,
key=XCOM_RETURN_KEY,
include_prior_dates=False):
@@ -2830,7 +2830,7 @@ class BaseOperator(LoggingMixin):
def xcom_pull(
self,
context,
- task_ids,
+ task_ids=None,
dag_id=None,
key=XCOM_RETURN_KEY,
include_prior_dates=None):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e7c118da/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 11bf7c9..6586177 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -1290,6 +1290,43 @@ class TaskInstanceTest(unittest.TestCase):
self.assertEqual(completed, expect_completed)
self.assertEqual(ti.state, expect_state)
+ def test_xcom_pull(self):
+ """
+ Test xcom_pull, using different filtering methods.
+ """
+ dag = models.DAG(
+ dag_id='test_xcom', schedule_interval='@monthly',
+ start_date=timezone.datetime(2016, 6, 1, 0, 0, 0))
+
+ exec_date = timezone.utcnow()
+
+ # Push a value
+ task1 = DummyOperator(task_id='test_xcom_1', dag=dag, owner='airflow')
+ ti1 = TI(task=task1, execution_date=exec_date)
+ ti1.xcom_push(key='foo', value='bar')
+
+ # Push another value with the same key (but by a different task)
+ task2 = DummyOperator(task_id='test_xcom_2', dag=dag, owner='airflow')
+ ti2 = TI(task=task2, execution_date=exec_date)
+ ti2.xcom_push(key='foo', value='baz')
+
+ # Pull with no arguments
+ result = ti1.xcom_pull()
+ self.assertEqual(result, None)
+ # Pull the value pushed most recently by any task.
+ result = ti1.xcom_pull(key='foo')
+ self.assertIn(result, 'baz')
+ # Pull the value pushed by the first task
+ result = ti1.xcom_pull(task_ids='test_xcom_1', key='foo')
+ self.assertEqual(result, 'bar')
+ # Pull the value pushed by the second task
+ result = ti1.xcom_pull(task_ids='test_xcom_2', key='foo')
+ self.assertEqual(result, 'baz')
+ # Pull the values pushed by both tasks
+ result = ti1.xcom_pull(
+ task_ids=['test_xcom_1', 'test_xcom_2'], key='foo')
+ self.assertEqual(result, ('bar', 'baz'))
+
def test_xcom_pull_after_success(self):
"""
tests xcom set/clear relative to a task in a 'success' rerun scenario