You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/08 13:01:24 UTC
[airflow] branch master updated: Make xcom_pull results order
deterministic (#12905)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new ff25bd6 Make xcom_pull results order deterministic (#12905)
ff25bd6 is described below
commit ff25bd6ffed53de651254f7c3e6d254e4191bfc7
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Dec 8 13:00:06 2020 +0000
Make xcom_pull results order deterministic (#12905)
closes https://github.com/apache/airflow/issues/11858
---
airflow/models/taskinstance.py | 13 +++++++++----
tests/models/test_taskinstance.py | 4 ++--
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82bfc66..f8d126e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1883,16 +1883,21 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
task_ids=task_ids,
include_prior_dates=include_prior_dates,
session=session,
- ).with_entities(XCom.value)
+ )
# Since we're only fetching the values field, and not the
# whole class, the @recreate annotation does not kick in.
# Therefore we need to deserialize the fields by ourselves.
-
if is_container(task_ids):
- return [XCom.deserialize_value(xcom) for xcom in query]
+ vals_kv = {
+ result.task_id: XCom.deserialize_value(result)
+ for result in query.with_entities(XCom.task_id, XCom.value)
+ }
+
+ values_ordered_by_id = [vals_kv.get(task_id) for task_id in task_ids]
+ return values_ordered_by_id
else:
- xcom = query.first()
+ xcom = query.with_entities(XCom.value).first()
if xcom:
return XCom.deserialize_value(xcom)
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index ab9040d..0fd8a85 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1030,9 +1030,9 @@ class TestTaskInstance(unittest.TestCase):
# Pull the value pushed by the second task
result = ti1.xcom_pull(task_ids='test_xcom_2', key='foo')
self.assertEqual(result, 'baz')
- # Pull the values pushed by both tasks
+ # Pull the values pushed by both tasks & Verify Order of task_ids pass & values returned
result = ti1.xcom_pull(task_ids=['test_xcom_1', 'test_xcom_2'], key='foo')
- self.assertEqual(result, ['baz', 'bar'])
+ self.assertEqual(result, ['bar', 'baz'])
def test_xcom_pull_after_success(self):
"""