You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/27 19:43:47 UTC

[airflow] branch v1-10-stable updated: BugFix: Tasks with depends_on_past or task_concurrency are stuck (#12663)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-stable by this push:
     new 2a7944d  BugFix: Tasks with depends_on_past or task_concurrency are stuck (#12663)
2a7944d is described below

commit 2a7944da85c3ec7cc913640ff89a66e06d486480
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Nov 27 19:42:42 2020 +0000

    BugFix: Tasks with depends_on_past or task_concurrency are stuck (#12663)
    
    closes https://github.com/apache/airflow/issues/12659
---
 airflow/models/dagrun.py         |  2 +-
 tests/jobs/test_scheduler_job.py | 98 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 99 insertions(+), 1 deletion(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 61ca6bd..9775c9f 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -273,7 +273,7 @@ class DagRun(Base, LoggingMixin):
         none_task_concurrency = all(t.task.task_concurrency is None
                                     for t in unfinished_tasks)
         # small speed up
-        if unfinished_tasks and none_depends_on_past and none_task_concurrency:
+        if unfinished_tasks:
             scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
             self.log.debug(
                 "number of scheduleable tasks for %s: %s task(s)",
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index e705067..ab406be 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -477,6 +477,104 @@ class SchedulerJobTest(unittest.TestCase):
             states=[State.SCHEDULED],
             session=session)))
 
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_with_task_concurrency(
+        self, state, start_date, end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            task_concurrency=2,
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy1',
+            dag=dag,
+            owner='airflow')
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
     def test_find_executable_task_instances_concurrency(self):
         dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency'
         task_id_1 = 'dummy'