You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2020/11/01 12:02:29 UTC
[airflow] branch master updated: Code smell fixes for BackfillJob
(#12005)
This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new e032301 Code smell fixes for BackfillJob (#12005)
e032301 is described below
commit e032301a0a0cd1cfa271ecfa87ef71dbe92e107d
Author: Xiaodong DENG <xd...@hotmail.com>
AuthorDate: Sun Nov 1 13:01:27 2020 +0100
Code smell fixes for BackfillJob (#12005)
- merge IFs when we can;
- One of the IF condition check should reuse STATES_COUNT_AS_RUNNING which has been already defined;
- Remove unused parameter for intermediate function _per_task_process()
---
airflow/jobs/backfill_job.py | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index ed51596..f594b26 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -271,13 +271,12 @@ class BackfillJob(BaseJob):
self.log.debug("Executor state: %s task %s", state, ti)
- if state in (State.FAILED, State.SUCCESS):
- if ti.state == State.RUNNING or ti.state == State.QUEUED:
- msg = ("Executor reports task instance {} finished ({}) "
- "although the task says its {}. Was the task "
- "killed externally? Info: {}".format(ti, state, ti.state, info))
- self.log.error(msg)
- ti.handle_failure(msg)
+ if state in (State.FAILED, State.SUCCESS) and ti.state in self.STATES_COUNT_AS_RUNNING:
+ msg = ("Executor reports task instance {} finished ({}) "
+ "although the task says its {}. Was the task "
+ "killed externally? Info: {}".format(ti, state, ti.state, info))
+ self.log.error(msg)
+ ti.handle_failure(msg)
@provide_session
def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = None):
@@ -425,7 +424,7 @@ class BackfillJob(BaseJob):
# determined deadlocked while they are actually
# waiting for their upstream to finish
@provide_session
- def _per_task_process(task, key, ti, session=None): # pylint: disable=too-many-return-statements
+ def _per_task_process(key, ti, session=None): # pylint: disable=too-many-return-statements
ti.refresh_from_db(lock_for_update=True, session=session)
task = self.dag.get_task(ti.task_id, include_subdags=True)
@@ -600,7 +599,7 @@ class BackfillJob(BaseJob):
"is reached."
)
- _per_task_process(task, key, ti)
+ _per_task_process(key, ti)
except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
self.log.debug(e)