You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/08/16 08:20:49 UTC
[airflow] branch main updated: Do not delete running DAG from the
UI (#17630)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5a64c1c Do not delete running DAG from the UI (#17630)
5a64c1c is described below
commit 5a64c1c7cb1161f4db79b3dd47dc8881f23a61b3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Aug 16 09:20:35 2021 +0100
Do not delete running DAG from the UI (#17630)
When the DAG appear again in the UI and we rerun it, say we have catchup set to True,
those running task instances that were not deleted would be rerun and an external state change
of the task instances would be detected by the LocalTaskJob thereby sending SIGTERM to the task runner
This change resolves this by making sure that DAGs are not deleted when the task instances are still
running
---
airflow/api/common/experimental/delete_dag.py | 8 ++++++-
airflow/www/views.py | 8 +++++++
tests/api/common/experimental/test_delete_dag.py | 29 ++++++++++++++++++++++--
3 files changed, 42 insertions(+), 3 deletions(-)
diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index 4462070..eb45384 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -21,10 +21,11 @@ import logging
from sqlalchemy import or_
from airflow import models
-from airflow.exceptions import DagNotFound
+from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DagModel, TaskFail
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.session import provide_session
+from airflow.utils.state import State
log = logging.getLogger(__name__)
@@ -40,6 +41,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
:return count of deleted dags
"""
log.info("Deleting DAG: %s", dag_id)
+ running_tis = (
+ session.query(models.TaskInstance.state).filter(models.TaskInstance.state.in_(State.unfinished)).all()
+ )
+ if running_tis:
+ raise AirflowException("TaskInstances still running")
dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
if dag is None:
raise DagNotFound(f"Dag id {dag_id} not found")
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f444f2d..64a26a9 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1523,6 +1523,14 @@ class Airflow(AirflowBaseView):
except DagFileExists:
flash(f"Dag id {dag_id} is still in DagBag. Remove the DAG file first.", 'error')
return redirect(request.referrer)
+ except AirflowException:
+ flash(
+ f"Cannot delete DAG with id {dag_id} because some task instances of the DAG "
+ "are still running. Please mark the task instances as "
+ "failed/succeeded before deleting the DAG",
+ "error",
+ )
+ return redirect(request.referrer)
flash(f"Deleting DAG with id {dag_id}. May take a couple minutes to fully disappear.")
diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/experimental/test_delete_dag.py
index 7570cb8..58bcd37 100644
--- a/tests/api/common/experimental/test_delete_dag.py
+++ b/tests/api/common/experimental/test_delete_dag.py
@@ -20,14 +20,16 @@ import unittest
import pytest
-from airflow import models
+from airflow import models, settings
from airflow.api.common.experimental.delete_dag import delete_dag
-from airflow.exceptions import DagNotFound
+from airflow.exceptions import AirflowException, DagNotFound
from airflow.operators.dummy import DummyOperator
+from airflow.utils import timezone
from airflow.utils.dates import days_ago
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_dags, clear_db_runs
DM = models.DagModel
DR = models.DagRun
@@ -52,6 +54,29 @@ class TestDeleteDAGCatchError(unittest.TestCase):
delete_dag("non-existent DAG")
+class TestDeleteDAGErrorsOnRunningTI:
+ def setup_method(self):
+ clear_db_dags()
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_dags()
+ clear_db_runs()
+
+ def test_delete_dag_running_taskinstances(self, create_dummy_dag):
+ dag_id = 'test-dag'
+ _, task = create_dummy_dag(dag_id)
+
+ ti = TI(task, execution_date=timezone.utcnow())
+ ti.refresh_from_db()
+ session = settings.Session()
+ ti.state = State.RUNNING
+ session.merge(ti)
+ session.commit()
+ with pytest.raises(AirflowException):
+ delete_dag(dag_id)
+
+
class TestDeleteDAGSuccessfulDelete(unittest.TestCase):
dag_file_path = "/usr/local/airflow/dags/test_dag_8.py"
key = "test_dag_id"