You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/27 20:19:53 UTC

[GitHub] [airflow] XD-DENG commented on a change in pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

XD-DENG commented on a change in pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#discussion_r531771282



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
             states=[State.SCHEDULED],
             session=session)))
 
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_with_task_concurrency(
+        self, state, start_date, end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            task_concurrency=2,
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy1',
+            dag=dag,
+            owner='airflow')
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]

Review comment:
       I'm observing CI error after this is merged into 1-10-stable.
   
   I assume it's because the order of the two elements in `ti_to_schedule` is not guaranteed.
   
   Possibly can instead assert like this:
   
   ```python
           mock_list = Mock()
           scheduler._process_task_instances(dag, task_instances_list=mock_list)
   
           mock_list.append.assert_called_with(
               (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER)
           )
   ```
   
   (https://github.com/astronomer/airflow/blob/67807ee492482f57442239e271747a5acc69e15b/tests/jobs/test_scheduler_job.py#L1604-L1609)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org