You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/12/21 15:48:22 UTC

incubator-airflow git commit: [AIRFLOW-703][AIRFLOW-1] Stop Xcom being cleared too early

Repository: incubator-airflow
Updated Branches:
  refs/heads/master e8f91c623 -> 96c787f39


[AIRFLOW-703][AIRFLOW-1] Stop Xcom being cleared too early

XComs should only be cleared when it is certain
that the task will run. Previously, XComs were cleared
before it was determined if tasks were runnable, queable,
or just being marked success. Now XComs are cleared
immediately before the task actually starts.

Closes #1951 from blrnw3/fix/xcom_bug_AIRFLOW-703


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/96c787f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/96c787f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/96c787f3

Branch: refs/heads/master
Commit: 96c787f390ad9852ae6c0c0fbb0510e36df185b1
Parents: e8f91c6
Author: Ben Lee Rodgers <br...@quartethealth.com>
Authored: Wed Dec 21 10:46:05 2016 -0500
Committer: Jeremiah Lowin <jl...@iHal-Vader.local>
Committed: Wed Dec 21 10:46:14 2016 -0500

----------------------------------------------------------------------
 airflow/models.py | 4 +++-
 tests/models.py   | 8 ++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96c787f3/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 5d7075d..55b855b 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1200,7 +1200,6 @@ class TaskInstance(Base):
             session.commit()
             return
 
-        self.clear_xcom_data()
         hr = "\n" + ("-" * 80) + "\n"  # Line break
 
         # For reporting purposes, we report based on 1-indexed,
@@ -1285,6 +1284,9 @@ class TaskInstance(Base):
                     raise AirflowException("Task received SIGTERM signal")
                 signal.signal(signal.SIGTERM, signal_handler)
 
+                # Don't clear Xcom until the task is certain to execute
+                self.clear_xcom_data()
+
                 self.render_templates()
                 task_copy.pre_execute(context=context)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96c787f3/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 74103fe..003fb21 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -588,6 +588,14 @@ class TaskInstanceTest(unittest.TestCase):
         # prior success)
         self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
 
+        # Test AIRFLOW-703: Xcom shouldn't be cleared if the task doesn't
+        # execute, even if dependencies are ignored
+        ti.run(ignore_all_deps=True, mark_success=True)
+        self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
+        # Xcom IS finally cleared once task has executed
+        ti.run(ignore_all_deps=True)
+        self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), None)
+
     def test_xcom_pull_different_execution_date(self):
         """
         tests xcom fetch behavior with different execution dates, using