You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/13 05:27:49 UTC

[airflow] branch master updated: Fix and Unquarantine test_change_state_for_tis_without_dagrun (#12323)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 75f25bd  Fix and Unquarantine test_change_state_for_tis_without_dagrun (#12323)
75f25bd is described below

commit 75f25bd8b96841689d1d9854a738db91c302bb63
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Nov 13 05:18:44 2020 +0000

    Fix and Unquarantine test_change_state_for_tis_without_dagrun (#12323)
    
    
    
    The test was simply wrong and failed since the new logic was added in
    https://github.com/apache/airflow/commit/c9a97baa86762b9ba37ef71432573b7949e47e2b
---
 airflow/jobs/scheduler_job.py    |  2 +-
 tests/jobs/test_scheduler_job.py | 20 +++++++++++---------
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 7d1aa72..e28f39d 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -852,9 +852,9 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                 )
                 .update(ti_prop_update, synchronize_session=False)
             )
-            session.flush()
 
         if tis_changed > 0:
+            session.flush()
             self.log.warning(
                 "Set %s task instances to state=%s as their associated DagRun was not in RUNNING state",
                 tis_changed,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d4786c4..91e2720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1691,7 +1691,6 @@ class TestSchedulerJob(unittest.TestCase):
             ti.refresh_from_db()
             self.assertEqual(State.QUEUED, ti.state)
 
-    @pytest.mark.quarantined
     def test_change_state_for_tis_without_dagrun(self):
         dag1 = DAG(dag_id='test_change_state_for_tis_without_dagrun', start_date=DEFAULT_DATE)
 
@@ -1739,12 +1738,12 @@ class TestSchedulerJob(unittest.TestCase):
         session.merge(ti3)
         session.commit()
 
-        with mock.patch.object(settings, "STORE_SERIALIZED_DAGS", True):
-            dagbag = DagBag("/dev/null", include_examples=False)
-            dagbag.bag_dag(dag1, root_dag=dag1)
-            dagbag.bag_dag(dag2, root_dag=dag2)
-            dagbag.bag_dag(dag3, root_dag=dag3)
-            dagbag.sync_to_db(session)
+        dagbag = DagBag("/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag1, root_dag=dag1)
+        dagbag.bag_dag(dag2, root_dag=dag2)
+        dagbag.bag_dag(dag3, root_dag=dag3)
+        dagbag.sync_to_db(session)
+        session.commit()
 
         scheduler = SchedulerJob(num_runs=0)
         scheduler.dagbag.collect_dags_from_db()
@@ -1774,15 +1773,18 @@ class TestSchedulerJob(unittest.TestCase):
         dr1.refresh_from_db(session=session)
         dr1.state = State.FAILED
 
-        # why o why
+        # Push the changes to DB
         session.merge(dr1)
         session.commit()
 
         scheduler._change_state_for_tis_without_dagrun(
             old_states=[State.SCHEDULED, State.QUEUED], new_state=State.NONE, session=session
         )
+
+        # Clear the session objects
+        session.expunge_all()
         ti1a.refresh_from_db(session=session)
-        self.assertEqual(ti1a.state, State.SCHEDULED)
+        self.assertEqual(ti1a.state, State.NONE)
 
         # don't touch ti1b
         ti1b.refresh_from_db(session=session)