You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/08 13:01:24 UTC

[airflow] branch master updated: Make xcom_pull results order deterministic (#12905)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new ff25bd6  Make xcom_pull results order deterministic (#12905)
ff25bd6 is described below

commit ff25bd6ffed53de651254f7c3e6d254e4191bfc7
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Dec 8 13:00:06 2020 +0000

    Make xcom_pull results order deterministic (#12905)
    
    closes https://github.com/apache/airflow/issues/11858
---
 airflow/models/taskinstance.py    | 13 +++++++++----
 tests/models/test_taskinstance.py |  4 ++--
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82bfc66..f8d126e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1883,16 +1883,21 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
             task_ids=task_ids,
             include_prior_dates=include_prior_dates,
             session=session,
-        ).with_entities(XCom.value)
+        )
 
         # Since we're only fetching the values field, and not the
         # whole class, the @recreate annotation does not kick in.
         # Therefore we need to deserialize the fields by ourselves.
-
         if is_container(task_ids):
-            return [XCom.deserialize_value(xcom) for xcom in query]
+            vals_kv = {
+                result.task_id: XCom.deserialize_value(result)
+                for result in query.with_entities(XCom.task_id, XCom.value)
+            }
+
+            values_ordered_by_id = [vals_kv.get(task_id) for task_id in task_ids]
+            return values_ordered_by_id
         else:
-            xcom = query.first()
+            xcom = query.with_entities(XCom.value).first()
             if xcom:
                 return XCom.deserialize_value(xcom)
 
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index ab9040d..0fd8a85 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1030,9 +1030,9 @@ class TestTaskInstance(unittest.TestCase):
         # Pull the value pushed by the second task
         result = ti1.xcom_pull(task_ids='test_xcom_2', key='foo')
         self.assertEqual(result, 'baz')
-        # Pull the values pushed by both tasks
+        # Pull the values pushed by both tasks & Verify Order of task_ids pass & values returned
         result = ti1.xcom_pull(task_ids=['test_xcom_1', 'test_xcom_2'], key='foo')
-        self.assertEqual(result, ['baz', 'bar'])
+        self.assertEqual(result, ['bar', 'baz'])
 
     def test_xcom_pull_after_success(self):
         """