You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/27 19:03:34 UTC

[GitHub] [airflow] kaxil opened a new pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

kaxil opened a new pull request #12663:
URL: https://github.com/apache/airflow/pull/12663


   closes https://github.com/apache/airflow/issues/12659
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#discussion_r531778387



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
             states=[State.SCHEDULED],
             session=session)))
 
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_with_task_concurrency(
+        self, state, start_date, end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            task_concurrency=2,
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy1',
+            dag=dag,
+            owner='airflow')
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]

Review comment:
       argghh Python 2.7 & 3.5 I guess because of the ordering




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#discussion_r531775127



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
             states=[State.SCHEDULED],
             session=session)))
 
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_with_task_concurrency(
+        self, state, start_date, end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            task_concurrency=2,
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy1',
+            dag=dag,
+            owner='airflow')
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]

Review comment:
       I will prepare a PR shortly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#discussion_r531778580



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
             states=[State.SCHEDULED],
             session=session)))
 
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_with_task_concurrency(
+        self, state, start_date, end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            task_concurrency=2,
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy1',
+            dag=dag,
+            owner='airflow')
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]

Review comment:
       Yes, the order is not guaranteed so assert two lists would fail.
   
   I have the change ready, and will raise the PR now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#issuecomment-734955526


   cc @nathadfield 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#discussion_r531771282



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
             states=[State.SCHEDULED],
             session=session)))
 
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_with_task_concurrency(
+        self, state, start_date, end_date,
+    ):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            task_concurrency=2,
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]
+
+    @parameterized.expand([
+        [State.NONE, None, None],
+        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30),
+         timezone.utcnow() - datetime.timedelta(minutes=15)],
+    ])
+    def test_process_task_instances_depends_on_past(self, state, start_date, end_date):
+        """
+        Test if _process_task_instances puts the right task instances into the
+        mock_list.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_process_execute_task_depends_on_past',
+            start_date=DEFAULT_DATE,
+            default_args={
+                'depends_on_past': True,
+            },
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy1',
+            dag=dag,
+            owner='airflow')
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            dag=dag,
+            owner='airflow')
+
+        with create_session() as session:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        with create_session() as session:
+            tis = dr.get_task_instances(session=session)
+            for ti in tis:
+                ti.state = state
+                ti.start_date = start_date
+                ti.end_date = end_date
+
+        ti_to_schedule = []
+        scheduler_job._process_task_instances(dag, task_instances_list=ti_to_schedule)
+
+        assert ti_to_schedule == [
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+        ]

Review comment:
       I'm observing CI error after this is merged into 1-10-stable.
   
   I assume it's because the order of the two elements in `ti_to_schedule` is not guaranteed.
   
   Possibly can instead assert like this:
   
   ```python
           mock_list = Mock()
           scheduler._process_task_instances(dag, task_instances_list=mock_list)
   
           mock_list.append.assert_called_with(
               (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER)
           )
   ```
   
   (https://github.com/astronomer/airflow/blob/67807ee492482f57442239e271747a5acc69e15b/tests/jobs/test_scheduler_job.py#L1604-L1609)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#issuecomment-734959938


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #12663: BugFix: Tasks with depends_on_past or task_concurrency are stuck

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #12663:
URL: https://github.com/apache/airflow/pull/12663


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org