You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/02/12 18:21:49 UTC

[GitHub] [airflow] ashb commented on a change in pull request #7402: [AIRFLOW-6779] fix scheduler bug related to task concurrency and depends on past

ashb commented on a change in pull request #7402: [AIRFLOW-6779] fix scheduler bug related to task concurrency and depends on past
URL: https://github.com/apache/airflow/pull/7402#discussion_r378429677
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -469,6 +469,105 @@ def test_dag_file_processor_process_task_instances(self, state, start_date, end_
             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER)
         )
 
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_dag_file_processor_process_task_instances_with_task_concurrency(
+        self, state, start_date, end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            task_concurrency=2,
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        dag.clear()
+        dr = dag_file_processor.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy1',
+            dag=dag,
+            owner='airflow')
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            depends_on_past=True,
 
 Review comment:
   Minor nit: no need to specify here and in default args.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services