You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/19 17:56:31 UTC

incubator-airflow git commit: [AIRFLOW-1950] Optionally pass xcom_pull task_ids

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1e36b37b6 -> e7c118da2


[AIRFLOW-1950] Optionally pass xcom_pull task_ids

Changes the `task_ids` parameter of xcom_pull from
required to optional.

This parameter has always allowed None to be
passed, but since it's a
required parameter, it must be specified as such.

With this change, we're no longer forced to pass
it.

Closes #2902 from bcb/make-xcom-pull-task-ids-
optional


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e7c118da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e7c118da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e7c118da

Branch: refs/heads/master
Commit: e7c118da2229f508d1f7c9423488aa5df96a2081
Parents: 1e36b37
Author: Beau Barker <be...@gmail.com>
Authored: Fri Jan 19 18:56:16 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 19 18:56:16 2018 +0100

----------------------------------------------------------------------
 airflow/models.py |  4 ++--
 tests/models.py   | 37 +++++++++++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e7c118da/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 5de18b2..edb3b67 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1828,7 +1828,7 @@ class TaskInstance(Base, LoggingMixin):
 
     def xcom_pull(
             self,
-            task_ids,
+            task_ids=None,
             dag_id=None,
             key=XCOM_RETURN_KEY,
             include_prior_dates=False):
@@ -2830,7 +2830,7 @@ class BaseOperator(LoggingMixin):
     def xcom_pull(
             self,
             context,
-            task_ids,
+            task_ids=None,
             dag_id=None,
             key=XCOM_RETURN_KEY,
             include_prior_dates=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e7c118da/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 11bf7c9..6586177 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -1290,6 +1290,43 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEqual(completed, expect_completed)
         self.assertEqual(ti.state, expect_state)
 
+    def test_xcom_pull(self):
+        """
+        Test xcom_pull, using different filtering methods.
+        """
+        dag = models.DAG(
+            dag_id='test_xcom', schedule_interval='@monthly',
+            start_date=timezone.datetime(2016, 6, 1, 0, 0, 0))
+
+        exec_date = timezone.utcnow()
+
+        # Push a value
+        task1 = DummyOperator(task_id='test_xcom_1', dag=dag, owner='airflow')
+        ti1 = TI(task=task1, execution_date=exec_date)
+        ti1.xcom_push(key='foo', value='bar')
+
+        # Push another value with the same key (but by a different task)
+        task2 = DummyOperator(task_id='test_xcom_2', dag=dag, owner='airflow')
+        ti2 = TI(task=task2, execution_date=exec_date)
+        ti2.xcom_push(key='foo', value='baz')
+
+        # Pull with no arguments
+        result = ti1.xcom_pull()
+        self.assertEqual(result, None)
+        # Pull the value pushed most recently by any task.
+        result = ti1.xcom_pull(key='foo')
+        self.assertIn(result, 'baz')
+        # Pull the value pushed by the first task
+        result = ti1.xcom_pull(task_ids='test_xcom_1', key='foo')
+        self.assertEqual(result, 'bar')
+        # Pull the value pushed by the second task
+        result = ti1.xcom_pull(task_ids='test_xcom_2', key='foo')
+        self.assertEqual(result, 'baz')
+        # Pull the values pushed by both tasks
+        result = ti1.xcom_pull(
+            task_ids=['test_xcom_1', 'test_xcom_2'], key='foo')
+        self.assertEqual(result, ('bar', 'baz'))
+
     def test_xcom_pull_after_success(self):
         """
         tests xcom set/clear relative to a task in a 'success' rerun scenario