You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/09/01 16:06:17 UTC
[airflow] branch main updated: Refactor unneeded 'continue' jumps in api (#33842)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b58a38e67 Refactor unneeded 'continue' jumps in api (#33842)
3b58a38e67 is described below
commit 3b58a38e67c6645b04b8c2a1cc3bd95f32da6f12
Author: Miroslav Šedivý <67...@users.noreply.github.com>
AuthorDate: Fri Sep 1 16:06:10 2023 +0000
Refactor unneeded 'continue' jumps in api (#33842)
---
airflow/api/common/delete_dag.py | 4 +---
airflow/api/common/mark_tasks.py | 26 ++++++++++++--------------
2 files changed, 13 insertions(+), 17 deletions(-)
diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index c94b3c39df..1a33467755 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -82,9 +82,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
count = 0
for model in get_sqla_model_classes():
- if hasattr(model, "dag_id"):
- if keep_records_in_log and model.__name__ == "Log":
- continue
+ if hasattr(model, "dag_id") and (not keep_records_in_log or model.__name__ != "Log"):
count += session.execute(
delete(model)
.where(model.dag_id.in_(dags_to_delete))
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index d71957f86b..cfd7471d24 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -67,16 +67,15 @@ def _create_dagruns(
}
for info in infos:
- if info.logical_date in dag_runs:
- continue
- dag_runs[info.logical_date] = dag.create_dagrun(
- execution_date=info.logical_date,
- data_interval=info.data_interval,
- start_date=timezone.utcnow(),
- external_trigger=False,
- state=state,
- run_type=run_type,
- )
+ if info.logical_date not in dag_runs:
+ dag_runs[info.logical_date] = dag.create_dagrun(
+ execution_date=info.logical_date,
+ data_interval=info.data_interval,
+ start_date=timezone.utcnow(),
+ external_trigger=False,
+ state=state,
+ run_type=run_type,
+ )
return dag_runs.values()
@@ -493,10 +492,9 @@ def set_dag_run_state_to_failed(
tasks = []
for task in dag.tasks:
- if task.task_id not in task_ids_of_running_tis:
- continue
- task.dag = dag
- tasks.append(task)
+ if task.task_id in task_ids_of_running_tis:
+ task.dag = dag
+ tasks.append(task)
# Mark non-finished tasks as SKIPPED.
tis = session.scalars(