You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/27 19:43:47 UTC
[airflow] branch v1-10-stable updated: BugFix: Tasks with
depends_on_past or task_concurrency are stuck (#12663)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-stable by this push:
new 2a7944d BugFix: Tasks with depends_on_past or task_concurrency are stuck (#12663)
2a7944d is described below
commit 2a7944da85c3ec7cc913640ff89a66e06d486480
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Nov 27 19:42:42 2020 +0000
BugFix: Tasks with depends_on_past or task_concurrency are stuck (#12663)
closes https://github.com/apache/airflow/issues/12659
---
airflow/models/dagrun.py | 2 +-
tests/jobs/test_scheduler_job.py | 98 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 99 insertions(+), 1 deletion(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 61ca6bd..9775c9f 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -273,7 +273,7 @@ class DagRun(Base, LoggingMixin):
none_task_concurrency = all(t.task.task_concurrency is None
for t in unfinished_tasks)
# small speed up
- if unfinished_tasks and none_depends_on_past and none_task_concurrency:
+ if unfinished_tasks:
scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
self.log.debug(
"number of scheduleable tasks for %s: %s task(s)",
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index e705067..ab406be 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -477,6 +477,104 @@ class SchedulerJobTest(unittest.TestCase):
states=[State.SCHEDULED],
session=session)))
+ @parameterized.expand([
+ [State.NONE, None, None],
+ [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ ])
+ def test_process_task_instances_with_task_concurrency(
+ self, state, start_date, end_date,
+ ):
+ """
+ Test if _process_task_instances puts the right task instances into the
+ mock_list.
+ """
+ dag = DAG(
+ dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+ start_date=DEFAULT_DATE)
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ task_concurrency=2,
+ dag=dag,
+ owner='airflow')
+
+ with create_session() as session:
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+
+ scheduler_job = SchedulerJob()
+ dag.clear()
+ dr = scheduler_job.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+
+ with create_session() as session:
+ tis = dr.get_task_instances(session=session)
+ for ti in tis:
+ ti.state = state
+ ti.start_date = start_date
+ ti.end_date = end_date
+
+ ti_to_schedule = []
+ scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+ assert ti_to_schedule == [
+ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+ ]
+
+ @parameterized.expand([
+ [State.NONE, None, None],
+ [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ ])
+ def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
+ """
+ Test if _process_task_instances puts the right task instances into the
+ mock_list.
+ """
+ dag = DAG(
+ dag_id='test_scheduler_process_execute_task_depends_on_past',
+ start_date=DEFAULT_DATE,
+ default_args={
+ 'depends_on_past': True,
+ },
+ )
+ dag_task1 = DummyOperator(
+ task_id='dummy1',
+ dag=dag,
+ owner='airflow')
+ dag_task2 = DummyOperator(
+ task_id='dummy2',
+ dag=dag,
+ owner='airflow')
+
+ with create_session() as session:
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+
+ scheduler_job = SchedulerJob()
+ dag.clear()
+ dr = scheduler_job.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+
+ with create_session() as session:
+ tis = dr.get_task_instances(session=session)
+ for ti in tis:
+ ti.state = state
+ ti.start_date = start_date
+ ti.end_date = end_date
+
+ ti_to_schedule = []
+ scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+ assert ti_to_schedule == [
+ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+ (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+ ]
+
def test_find_executable_task_instances_concurrency(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency'
task_id_1 = 'dummy'