You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:35:06 UTC

[GitHub] [airflow] turbaszek opened a new pull request #9018: Improve SchedulerJob code style

turbaszek opened a new pull request #9018:
URL: https://github.com/apache/airflow/pull/9018


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#discussion_r431013108



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1440,94 +1436,89 @@ def _change_state_for_tasks_failed_to_execute(self, session=None):
 
         :param session: session for ORM operations
         """
-        if self.executor.queued_tasks:
-            TI = models.TaskInstance
-            filter_for_ti_state_change = (
-                [and_(
-                    TI.dag_id == dag_id,
-                    TI.task_id == task_id,
-                    TI.execution_date == execution_date,
-                    # The TI.try_number will return raw try_number+1 since the
-                    # ti is not running. And we need to -1 to match the DB record.
-                    TI._try_number == try_number - 1,  # pylint: disable=protected-access
-                    TI.state == State.QUEUED)
-                    for dag_id, task_id, execution_date, try_number
-                    in self.executor.queued_tasks.keys()])
-            ti_query = (session.query(TI)
-                        .filter(or_(*filter_for_ti_state_change)))
-            tis_to_set_to_scheduled = (ti_query
-                                       .with_for_update()
-                                       .all())
-            if len(tis_to_set_to_scheduled) == 0:
-                session.commit()
-                return
-
-            # set TIs to queued state
-            filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
-            session.query(TI).filter(filter_for_tis).update(
-                {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
-            )
+        if not self.executor.queued_tasks:
+            return
 
-            for task_instance in tis_to_set_to_scheduled:
-                self.executor.queued_tasks.pop(task_instance.key)
+        filter_for_ti_state_change = (
+            [and_(
+                TI.dag_id == dag_id,
+                TI.task_id == task_id,
+                TI.execution_date == execution_date,
+                # The TI.try_number will return raw try_number+1 since the
+                # ti is not running. And we need to -1 to match the DB record.
+                TI._try_number == try_number - 1,  # pylint: disable=protected-access
+                TI.state == State.QUEUED)
+                for dag_id, task_id, execution_date, try_number
+                in self.executor.queued_tasks.keys()])
+        ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
+        tis_to_set_to_scheduled = ti_query.with_for_update().all()
+        if not tis_to_set_to_scheduled:
+            return
 
-            task_instance_str = "\n\t".join(
-                [repr(x) for x in tis_to_set_to_scheduled])
+        # set TIs to queued state
+        filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
+        session.query(TI).filter(filter_for_tis).update(
+            {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
+        )
 
-            session.commit()

Review comment:
       You are correct, this method is restricted and called only in scheduler job.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on a change in pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
olchas commented on a change in pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#discussion_r430996595



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1440,94 +1436,89 @@ def _change_state_for_tasks_failed_to_execute(self, session=None):
 
         :param session: session for ORM operations
         """
-        if self.executor.queued_tasks:
-            TI = models.TaskInstance
-            filter_for_ti_state_change = (
-                [and_(
-                    TI.dag_id == dag_id,
-                    TI.task_id == task_id,
-                    TI.execution_date == execution_date,
-                    # The TI.try_number will return raw try_number+1 since the
-                    # ti is not running. And we need to -1 to match the DB record.
-                    TI._try_number == try_number - 1,  # pylint: disable=protected-access
-                    TI.state == State.QUEUED)
-                    for dag_id, task_id, execution_date, try_number
-                    in self.executor.queued_tasks.keys()])
-            ti_query = (session.query(TI)
-                        .filter(or_(*filter_for_ti_state_change)))
-            tis_to_set_to_scheduled = (ti_query
-                                       .with_for_update()
-                                       .all())
-            if len(tis_to_set_to_scheduled) == 0:
-                session.commit()
-                return
-
-            # set TIs to queued state
-            filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
-            session.query(TI).filter(filter_for_tis).update(
-                {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
-            )
+        if not self.executor.queued_tasks:
+            return
 
-            for task_instance in tis_to_set_to_scheduled:
-                self.executor.queued_tasks.pop(task_instance.key)
+        filter_for_ti_state_change = (
+            [and_(
+                TI.dag_id == dag_id,
+                TI.task_id == task_id,
+                TI.execution_date == execution_date,
+                # The TI.try_number will return raw try_number+1 since the
+                # ti is not running. And we need to -1 to match the DB record.
+                TI._try_number == try_number - 1,  # pylint: disable=protected-access
+                TI.state == State.QUEUED)
+                for dag_id, task_id, execution_date, try_number
+                in self.executor.queued_tasks.keys()])
+        ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
+        tis_to_set_to_scheduled = ti_query.with_for_update().all()
+        if not tis_to_set_to_scheduled:
+            return
 
-            task_instance_str = "\n\t".join(
-                [repr(x) for x in tis_to_set_to_scheduled])
+        # set TIs to queued state
+        filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
+        session.query(TI).filter(filter_for_tis).update(
+            {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
+        )
 
-            session.commit()

Review comment:
       As I understand this could be removed, as commit will be executed anyway due to `@provide_session` decorator? But doesn't it happen only if `session` is not provided explicitly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on a change in pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
olchas commented on a change in pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#discussion_r430993095



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1440,94 +1436,89 @@ def _change_state_for_tasks_failed_to_execute(self, session=None):
 
         :param session: session for ORM operations
         """
-        if self.executor.queued_tasks:
-            TI = models.TaskInstance
-            filter_for_ti_state_change = (
-                [and_(
-                    TI.dag_id == dag_id,
-                    TI.task_id == task_id,
-                    TI.execution_date == execution_date,
-                    # The TI.try_number will return raw try_number+1 since the
-                    # ti is not running. And we need to -1 to match the DB record.
-                    TI._try_number == try_number - 1,  # pylint: disable=protected-access
-                    TI.state == State.QUEUED)
-                    for dag_id, task_id, execution_date, try_number
-                    in self.executor.queued_tasks.keys()])
-            ti_query = (session.query(TI)
-                        .filter(or_(*filter_for_ti_state_change)))
-            tis_to_set_to_scheduled = (ti_query
-                                       .with_for_update()
-                                       .all())
-            if len(tis_to_set_to_scheduled) == 0:
-                session.commit()
-                return
-
-            # set TIs to queued state
-            filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
-            session.query(TI).filter(filter_for_tis).update(
-                {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
-            )
+        if not self.executor.queued_tasks:
+            return
 
-            for task_instance in tis_to_set_to_scheduled:
-                self.executor.queued_tasks.pop(task_instance.key)
+        filter_for_ti_state_change = (
+            [and_(
+                TI.dag_id == dag_id,
+                TI.task_id == task_id,
+                TI.execution_date == execution_date,
+                # The TI.try_number will return raw try_number+1 since the
+                # ti is not running. And we need to -1 to match the DB record.
+                TI._try_number == try_number - 1,  # pylint: disable=protected-access
+                TI.state == State.QUEUED)
+                for dag_id, task_id, execution_date, try_number
+                in self.executor.queued_tasks.keys()])
+        ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
+        tis_to_set_to_scheduled = ti_query.with_for_update().all()
+        if not tis_to_set_to_scheduled:
+            return
 
-            task_instance_str = "\n\t".join(
-                [repr(x) for x in tis_to_set_to_scheduled])
+        # set TIs to queued state
+        filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
+        session.query(TI).filter(filter_for_tis).update(
+            {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
+        )
 
-            session.commit()
-            self.log.info("Set the following tasks to scheduled state:\n\t%s", task_instance_str)
+        task_instance_str = ""
+        for task_instance in tis_to_set_to_scheduled:
+            self.executor.queued_tasks.pop(task_instance.key)
+            task_instance_str += f"\n\t{repr(task_instance)}"

Review comment:
       How about:
   ```suggestion
           task_instance_str_list = []
           for task_instance in tis_to_set_to_scheduled:
               self.executor.queued_tasks.pop(task_instance.key)
               task_instance_str_list += [f"{repr(task_instance)}"]
           task_instance_str = "\n\t".join(task_instance_str_list)
   ```
   
   In your solution, task_instance_str is recreated in every loop, because strings are immutable. It also causes task_instance_str to have additional "\n\t" in the beginning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #9018:
URL: https://github.com/apache/airflow/pull/9018


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#discussion_r431013369



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1440,94 +1436,89 @@ def _change_state_for_tasks_failed_to_execute(self, session=None):
 
         :param session: session for ORM operations
         """
-        if self.executor.queued_tasks:
-            TI = models.TaskInstance
-            filter_for_ti_state_change = (
-                [and_(
-                    TI.dag_id == dag_id,
-                    TI.task_id == task_id,
-                    TI.execution_date == execution_date,
-                    # The TI.try_number will return raw try_number+1 since the
-                    # ti is not running. And we need to -1 to match the DB record.
-                    TI._try_number == try_number - 1,  # pylint: disable=protected-access
-                    TI.state == State.QUEUED)
-                    for dag_id, task_id, execution_date, try_number
-                    in self.executor.queued_tasks.keys()])
-            ti_query = (session.query(TI)
-                        .filter(or_(*filter_for_ti_state_change)))
-            tis_to_set_to_scheduled = (ti_query
-                                       .with_for_update()
-                                       .all())
-            if len(tis_to_set_to_scheduled) == 0:
-                session.commit()
-                return
-
-            # set TIs to queued state
-            filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
-            session.query(TI).filter(filter_for_tis).update(
-                {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
-            )
+        if not self.executor.queued_tasks:
+            return
 
-            for task_instance in tis_to_set_to_scheduled:
-                self.executor.queued_tasks.pop(task_instance.key)
+        filter_for_ti_state_change = (
+            [and_(
+                TI.dag_id == dag_id,
+                TI.task_id == task_id,
+                TI.execution_date == execution_date,
+                # The TI.try_number will return raw try_number+1 since the
+                # ti is not running. And we need to -1 to match the DB record.
+                TI._try_number == try_number - 1,  # pylint: disable=protected-access
+                TI.state == State.QUEUED)
+                for dag_id, task_id, execution_date, try_number
+                in self.executor.queued_tasks.keys()])
+        ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
+        tis_to_set_to_scheduled = ti_query.with_for_update().all()
+        if not tis_to_set_to_scheduled:
+            return
 
-            task_instance_str = "\n\t".join(
-                [repr(x) for x in tis_to_set_to_scheduled])
+        # set TIs to queued state
+        filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
+        session.query(TI).filter(filter_for_tis).update(
+            {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
+        )
 
-            session.commit()
-            self.log.info("Set the following tasks to scheduled state:\n\t%s", task_instance_str)
+        task_instance_str = ""
+        for task_instance in tis_to_set_to_scheduled:
+            self.executor.queued_tasks.pop(task_instance.key)
+            task_instance_str += f"\n\t{repr(task_instance)}"

Review comment:
       Thanks for your suggestion! However, I will revert this change 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on a change in pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
olchas commented on a change in pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#discussion_r430993095



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1440,94 +1436,89 @@ def _change_state_for_tasks_failed_to_execute(self, session=None):
 
         :param session: session for ORM operations
         """
-        if self.executor.queued_tasks:
-            TI = models.TaskInstance
-            filter_for_ti_state_change = (
-                [and_(
-                    TI.dag_id == dag_id,
-                    TI.task_id == task_id,
-                    TI.execution_date == execution_date,
-                    # The TI.try_number will return raw try_number+1 since the
-                    # ti is not running. And we need to -1 to match the DB record.
-                    TI._try_number == try_number - 1,  # pylint: disable=protected-access
-                    TI.state == State.QUEUED)
-                    for dag_id, task_id, execution_date, try_number
-                    in self.executor.queued_tasks.keys()])
-            ti_query = (session.query(TI)
-                        .filter(or_(*filter_for_ti_state_change)))
-            tis_to_set_to_scheduled = (ti_query
-                                       .with_for_update()
-                                       .all())
-            if len(tis_to_set_to_scheduled) == 0:
-                session.commit()
-                return
-
-            # set TIs to queued state
-            filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
-            session.query(TI).filter(filter_for_tis).update(
-                {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
-            )
+        if not self.executor.queued_tasks:
+            return
 
-            for task_instance in tis_to_set_to_scheduled:
-                self.executor.queued_tasks.pop(task_instance.key)
+        filter_for_ti_state_change = (
+            [and_(
+                TI.dag_id == dag_id,
+                TI.task_id == task_id,
+                TI.execution_date == execution_date,
+                # The TI.try_number will return raw try_number+1 since the
+                # ti is not running. And we need to -1 to match the DB record.
+                TI._try_number == try_number - 1,  # pylint: disable=protected-access
+                TI.state == State.QUEUED)
+                for dag_id, task_id, execution_date, try_number
+                in self.executor.queued_tasks.keys()])
+        ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
+        tis_to_set_to_scheduled = ti_query.with_for_update().all()
+        if not tis_to_set_to_scheduled:
+            return
 
-            task_instance_str = "\n\t".join(
-                [repr(x) for x in tis_to_set_to_scheduled])
+        # set TIs to queued state
+        filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
+        session.query(TI).filter(filter_for_tis).update(
+            {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
+        )
 
-            session.commit()
-            self.log.info("Set the following tasks to scheduled state:\n\t%s", task_instance_str)
+        task_instance_str = ""
+        for task_instance in tis_to_set_to_scheduled:
+            self.executor.queued_tasks.pop(task_instance.key)
+            task_instance_str += f"\n\t{repr(task_instance)}"

Review comment:
       How about:
   ```suggestion
           task_instance_str_list = []
           for task_instance in tis_to_set_to_scheduled:
               self.executor.queued_tasks.pop(task_instance.key)
               task_instance_str_list += [f"{repr(task_instance)}"]
           task_instance_str = "\n\t".join(task_instance_str_list)
   ```
   
   In your solution, `task_instance_str` is recreated in every loop, because strings are immutable. It also causes task_instance_str to have additional "\n\t" in the beginning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
olchas commented on pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#issuecomment-634583902


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas removed a comment on pull request #9018: Improve SchedulerJob code style

Posted by GitBox <gi...@apache.org>.
olchas removed a comment on pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#issuecomment-634583902


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org