You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/09/01 16:06:17 UTC

[airflow] branch main updated: Refactor unneeded 'continue' jumps in api (#33842)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b58a38e67 Refactor unneeded 'continue' jumps in api (#33842)
3b58a38e67 is described below

commit 3b58a38e67c6645b04b8c2a1cc3bd95f32da6f12
Author: Miroslav Šedivý <67...@users.noreply.github.com>
AuthorDate: Fri Sep 1 16:06:10 2023 +0000

    Refactor unneeded 'continue' jumps in api (#33842)
---
 airflow/api/common/delete_dag.py |  4 +---
 airflow/api/common/mark_tasks.py | 26 ++++++++++++--------------
 2 files changed, 13 insertions(+), 17 deletions(-)

diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index c94b3c39df..1a33467755 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -82,9 +82,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
     count = 0
 
     for model in get_sqla_model_classes():
-        if hasattr(model, "dag_id"):
-            if keep_records_in_log and model.__name__ == "Log":
-                continue
+        if hasattr(model, "dag_id") and (not keep_records_in_log or model.__name__ != "Log"):
             count += session.execute(
                 delete(model)
                 .where(model.dag_id.in_(dags_to_delete))
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index d71957f86b..cfd7471d24 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -67,16 +67,15 @@ def _create_dagruns(
     }
 
     for info in infos:
-        if info.logical_date in dag_runs:
-            continue
-        dag_runs[info.logical_date] = dag.create_dagrun(
-            execution_date=info.logical_date,
-            data_interval=info.data_interval,
-            start_date=timezone.utcnow(),
-            external_trigger=False,
-            state=state,
-            run_type=run_type,
-        )
+        if info.logical_date not in dag_runs:
+            dag_runs[info.logical_date] = dag.create_dagrun(
+                execution_date=info.logical_date,
+                data_interval=info.data_interval,
+                start_date=timezone.utcnow(),
+                external_trigger=False,
+                state=state,
+                run_type=run_type,
+            )
     return dag_runs.values()
 
 
@@ -493,10 +492,9 @@ def set_dag_run_state_to_failed(
 
     tasks = []
     for task in dag.tasks:
-        if task.task_id not in task_ids_of_running_tis:
-            continue
-        task.dag = dag
-        tasks.append(task)
+        if task.task_id in task_ids_of_running_tis:
+            task.dag = dag
+            tasks.append(task)
 
     # Mark non-finished tasks as SKIPPED.
     tis = session.scalars(