You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/13 05:27:49 UTC
[airflow] branch master updated: Fix and Unquarantine
test_change_state_for_tis_without_dagrun (#12323)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 75f25bd Fix and Unquarantine test_change_state_for_tis_without_dagrun (#12323)
75f25bd is described below
commit 75f25bd8b96841689d1d9854a738db91c302bb63
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Nov 13 05:18:44 2020 +0000
Fix and Unquarantine test_change_state_for_tis_without_dagrun (#12323)
The test was simply wrong and failed since the new logic was added in
https://github.com/apache/airflow/commit/c9a97baa86762b9ba37ef71432573b7949e47e2b
---
airflow/jobs/scheduler_job.py | 2 +-
tests/jobs/test_scheduler_job.py | 20 +++++++++++---------
2 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 7d1aa72..e28f39d 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -852,9 +852,9 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
)
.update(ti_prop_update, synchronize_session=False)
)
- session.flush()
if tis_changed > 0:
+ session.flush()
self.log.warning(
"Set %s task instances to state=%s as their associated DagRun was not in RUNNING state",
tis_changed,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d4786c4..91e2720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1691,7 +1691,6 @@ class TestSchedulerJob(unittest.TestCase):
ti.refresh_from_db()
self.assertEqual(State.QUEUED, ti.state)
- @pytest.mark.quarantined
def test_change_state_for_tis_without_dagrun(self):
dag1 = DAG(dag_id='test_change_state_for_tis_without_dagrun', start_date=DEFAULT_DATE)
@@ -1739,12 +1738,12 @@ class TestSchedulerJob(unittest.TestCase):
session.merge(ti3)
session.commit()
- with mock.patch.object(settings, "STORE_SERIALIZED_DAGS", True):
- dagbag = DagBag("/dev/null", include_examples=False)
- dagbag.bag_dag(dag1, root_dag=dag1)
- dagbag.bag_dag(dag2, root_dag=dag2)
- dagbag.bag_dag(dag3, root_dag=dag3)
- dagbag.sync_to_db(session)
+ dagbag = DagBag("/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag1, root_dag=dag1)
+ dagbag.bag_dag(dag2, root_dag=dag2)
+ dagbag.bag_dag(dag3, root_dag=dag3)
+ dagbag.sync_to_db(session)
+ session.commit()
scheduler = SchedulerJob(num_runs=0)
scheduler.dagbag.collect_dags_from_db()
@@ -1774,15 +1773,18 @@ class TestSchedulerJob(unittest.TestCase):
dr1.refresh_from_db(session=session)
dr1.state = State.FAILED
- # why o why
+ # Push the changes to DB
session.merge(dr1)
session.commit()
scheduler._change_state_for_tis_without_dagrun(
old_states=[State.SCHEDULED, State.QUEUED], new_state=State.NONE, session=session
)
+
+ # Clear the session objects
+ session.expunge_all()
ti1a.refresh_from_db(session=session)
- self.assertEqual(ti1a.state, State.SCHEDULED)
+ self.assertEqual(ti1a.state, State.NONE)
# don't touch ti1b
ti1b.refresh_from_db(session=session)