You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2020/11/01 12:02:29 UTC

[airflow] branch master updated: Code smell fixes for BackfillJob (#12005)

This is an automated email from the ASF dual-hosted git repository.

xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new e032301  Code smell fixes for BackfillJob (#12005)
e032301 is described below

commit e032301a0a0cd1cfa271ecfa87ef71dbe92e107d
Author: Xiaodong DENG <xd...@hotmail.com>
AuthorDate: Sun Nov 1 13:01:27 2020 +0100

    Code smell fixes for BackfillJob (#12005)
    
    - merge IFs when we can;
    - One of the IF condition check should reuse STATES_COUNT_AS_RUNNING which has been already defined;
    - Remove unused parameter for intermediate function _per_task_process()
---
 airflow/jobs/backfill_job.py | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index ed51596..f594b26 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -271,13 +271,12 @@ class BackfillJob(BaseJob):
 
             self.log.debug("Executor state: %s task %s", state, ti)
 
-            if state in (State.FAILED, State.SUCCESS):
-                if ti.state == State.RUNNING or ti.state == State.QUEUED:
-                    msg = ("Executor reports task instance {} finished ({}) "
-                           "although the task says its {}. Was the task "
-                           "killed externally? Info: {}".format(ti, state, ti.state, info))
-                    self.log.error(msg)
-                    ti.handle_failure(msg)
+            if state in (State.FAILED, State.SUCCESS) and ti.state in self.STATES_COUNT_AS_RUNNING:
+                msg = ("Executor reports task instance {} finished ({}) "
+                       "although the task says its {}. Was the task "
+                       "killed externally? Info: {}".format(ti, state, ti.state, info))
+                self.log.error(msg)
+                ti.handle_failure(msg)
 
     @provide_session
     def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = None):
@@ -425,7 +424,7 @@ class BackfillJob(BaseJob):
             # determined deadlocked while they are actually
             # waiting for their upstream to finish
             @provide_session
-            def _per_task_process(task, key, ti, session=None):  # pylint: disable=too-many-return-statements
+            def _per_task_process(key, ti, session=None):  # pylint: disable=too-many-return-statements
                 ti.refresh_from_db(lock_for_update=True, session=session)
 
                 task = self.dag.get_task(ti.task_id, include_subdags=True)
@@ -600,7 +599,7 @@ class BackfillJob(BaseJob):
                                     "is reached."
                                 )
 
-                        _per_task_process(task, key, ti)
+                        _per_task_process(key, ti)
             except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
                 self.log.debug(e)