You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/04/12 06:12:08 UTC

[airflow] branch main updated: Do not clear XCom when resuming from deferral (#22932)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b687ec82a Do not clear XCom when resuming from deferral (#22932)
8b687ec82a is described below

commit 8b687ec82a7047fc35410f5c5bb0726de434e749
Author: Rocco Pascale <75...@users.noreply.github.com>
AuthorDate: Tue Apr 12 02:12:01 2022 -0400

    Do not clear XCom when resuming from deferral (#22932)
---
 airflow/models/taskinstance.py    |  6 ++++--
 tests/models/test_taskinstance.py | 25 +++++++++++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2f3d75436d..5b53916acf 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1510,8 +1510,10 @@ class TaskInstance(Base, LoggingMixin):
 
         signal.signal(signal.SIGTERM, signal_handler)
 
-        # Don't clear Xcom until the task is certain to execute
-        self.clear_xcom_data()
+        # Don't clear Xcom until the task is certain to execute, and check if we are resuming from deferral.
+        if not self.next_method:
+            self.clear_xcom_data()
+
         with Stats.timer(f'dag.{self.task.dag_id}.{self.task.task_id}.duration'):
             # Set the validated/merged params on the task object.
             self.task.params = context['params']
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 2a1ea0889e..40190c1f00 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1121,6 +1121,31 @@ class TestTaskInstance:
         ti.run(ignore_all_deps=True)
         assert ti.xcom_pull(task_ids='test_xcom', key=key) is None
 
+    def test_xcom_pull_after_deferral(self, create_task_instance, session):
+        """
+        tests xcom will not clear before a task runs its next method after deferral.
+        """
+
+        key = 'xcom_key'
+        value = 'xcom_value'
+
+        ti = create_task_instance(
+            dag_id='test_xcom',
+            schedule_interval='@monthly',
+            task_id='test_xcom',
+            pool='test_xcom',
+        )
+
+        ti.run(mark_success=True)
+        ti.xcom_push(key=key, value=value)
+
+        ti.next_method = "execute"
+        session.merge(ti)
+        session.commit()
+
+        ti.run(ignore_all_deps=True)
+        assert ti.xcom_pull(task_ids='test_xcom', key=key) == value
+
     def test_xcom_pull_different_execution_date(self, create_task_instance):
         """
         tests xcom fetch behavior with different execution dates, using