You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/04/12 06:12:08 UTC
[airflow] branch main updated: Do not clear XCom when resuming from deferral (#22932)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b687ec82a Do not clear XCom when resuming from deferral (#22932)
8b687ec82a is described below
commit 8b687ec82a7047fc35410f5c5bb0726de434e749
Author: Rocco Pascale <75...@users.noreply.github.com>
AuthorDate: Tue Apr 12 02:12:01 2022 -0400
Do not clear XCom when resuming from deferral (#22932)
---
airflow/models/taskinstance.py | 6 ++++--
tests/models/test_taskinstance.py | 25 +++++++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2f3d75436d..5b53916acf 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1510,8 +1510,10 @@ class TaskInstance(Base, LoggingMixin):
signal.signal(signal.SIGTERM, signal_handler)
- # Don't clear Xcom until the task is certain to execute
- self.clear_xcom_data()
+ # Don't clear Xcom until the task is certain to execute, and check if we are resuming from deferral.
+ if not self.next_method:
+ self.clear_xcom_data()
+
with Stats.timer(f'dag.{self.task.dag_id}.{self.task.task_id}.duration'):
# Set the validated/merged params on the task object.
self.task.params = context['params']
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 2a1ea0889e..40190c1f00 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1121,6 +1121,31 @@ class TestTaskInstance:
ti.run(ignore_all_deps=True)
assert ti.xcom_pull(task_ids='test_xcom', key=key) is None
+ def test_xcom_pull_after_deferral(self, create_task_instance, session):
+ """
+ tests xcom will not clear before a task runs its next method after deferral.
+ """
+
+ key = 'xcom_key'
+ value = 'xcom_value'
+
+ ti = create_task_instance(
+ dag_id='test_xcom',
+ schedule_interval='@monthly',
+ task_id='test_xcom',
+ pool='test_xcom',
+ )
+
+ ti.run(mark_success=True)
+ ti.xcom_push(key=key, value=value)
+
+ ti.next_method = "execute"
+ session.merge(ti)
+ session.commit()
+
+ ti.run(ignore_all_deps=True)
+ assert ti.xcom_pull(task_ids='test_xcom', key=key) == value
+
def test_xcom_pull_different_execution_date(self, create_task_instance):
"""
tests xcom fetch behavior with different execution dates, using