You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/27 09:47:10 UTC

[GitHub] [airflow] olchas commented on a change in pull request #9018: Improve SchedulerJob code style

olchas commented on a change in pull request #9018:
URL: https://github.com/apache/airflow/pull/9018#discussion_r430993095



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1440,94 +1436,89 @@ def _change_state_for_tasks_failed_to_execute(self, session=None):
 
         :param session: session for ORM operations
         """
-        if self.executor.queued_tasks:
-            TI = models.TaskInstance
-            filter_for_ti_state_change = (
-                [and_(
-                    TI.dag_id == dag_id,
-                    TI.task_id == task_id,
-                    TI.execution_date == execution_date,
-                    # The TI.try_number will return raw try_number+1 since the
-                    # ti is not running. And we need to -1 to match the DB record.
-                    TI._try_number == try_number - 1,  # pylint: disable=protected-access
-                    TI.state == State.QUEUED)
-                    for dag_id, task_id, execution_date, try_number
-                    in self.executor.queued_tasks.keys()])
-            ti_query = (session.query(TI)
-                        .filter(or_(*filter_for_ti_state_change)))
-            tis_to_set_to_scheduled = (ti_query
-                                       .with_for_update()
-                                       .all())
-            if len(tis_to_set_to_scheduled) == 0:
-                session.commit()
-                return
-
-            # set TIs to queued state
-            filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
-            session.query(TI).filter(filter_for_tis).update(
-                {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
-            )
+        if not self.executor.queued_tasks:
+            return
 
-            for task_instance in tis_to_set_to_scheduled:
-                self.executor.queued_tasks.pop(task_instance.key)
+        filter_for_ti_state_change = (
+            [and_(
+                TI.dag_id == dag_id,
+                TI.task_id == task_id,
+                TI.execution_date == execution_date,
+                # The TI.try_number will return raw try_number+1 since the
+                # ti is not running. And we need to -1 to match the DB record.
+                TI._try_number == try_number - 1,  # pylint: disable=protected-access
+                TI.state == State.QUEUED)
+                for dag_id, task_id, execution_date, try_number
+                in self.executor.queued_tasks.keys()])
+        ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
+        tis_to_set_to_scheduled = ti_query.with_for_update().all()
+        if not tis_to_set_to_scheduled:
+            return
 
-            task_instance_str = "\n\t".join(
-                [repr(x) for x in tis_to_set_to_scheduled])
+        # set TIs to queued state
+        filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
+        session.query(TI).filter(filter_for_tis).update(
+            {TI.state: State.SCHEDULED, TI.queued_dttm: None}, synchronize_session=False
+        )
 
-            session.commit()
-            self.log.info("Set the following tasks to scheduled state:\n\t%s", task_instance_str)
+        task_instance_str = ""
+        for task_instance in tis_to_set_to_scheduled:
+            self.executor.queued_tasks.pop(task_instance.key)
+            task_instance_str += f"\n\t{repr(task_instance)}"

Review comment:
       How about:
   ```suggestion
           task_instance_str_list = []
           for task_instance in tis_to_set_to_scheduled:
               self.executor.queued_tasks.pop(task_instance.key)
               task_instance_str_list += [f"{repr(task_instance)}"]
           task_instance_str = "\n\t".join(task_instance_str_list)
   ```
   
   In your solution, task_instance_str is recreated in every loop, because strings are immutable. It also causes task_instance_str to have additional "\n\t" in the beginning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org