You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/10 12:29:22 UTC

[airflow] 01/03: Do not delete running DAG from the UI (#17630)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 59bf5b5167192ebc60e2c9235d0dc1e999bf228a
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Aug 16 09:20:35 2021 +0100

    Do not delete running DAG from the UI (#17630)
    
    When the DAG appear again in the UI and we rerun it, say we have catchup set to True,
    those running task instances that were not deleted would be rerun and an external state change
    of the task instances would be detected by the LocalTaskJob thereby sending SIGTERM to the task runner
    
    This change resolves this by making sure that DAGs are not deleted when the task instances are still
    running
    
    (cherry picked from commit 5a64c1c7cb1161f4db79b3dd47dc8881f23a61b3)
---
 airflow/api/common/experimental/delete_dag.py    |  8 ++++++-
 airflow/www/views.py                             |  8 +++++++
 tests/api/common/experimental/test_delete_dag.py | 29 ++++++++++++++++++++++--
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index 4462070..eb45384 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -21,10 +21,11 @@ import logging
 from sqlalchemy import or_
 
 from airflow import models
-from airflow.exceptions import DagNotFound
+from airflow.exceptions import AirflowException, DagNotFound
 from airflow.models import DagModel, TaskFail
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils.session import provide_session
+from airflow.utils.state import State
 
 log = logging.getLogger(__name__)
 
@@ -40,6 +41,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     :return count of deleted dags
     """
     log.info("Deleting DAG: %s", dag_id)
+    running_tis = (
+        session.query(models.TaskInstance.state).filter(models.TaskInstance.state.in_(State.unfinished)).all()
+    )
+    if running_tis:
+        raise AirflowException("TaskInstances still running")
     dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 830047b..db8dbed 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1456,6 +1456,14 @@ class Airflow(AirflowBaseView):
         except DagFileExists:
             flash(f"Dag id {dag_id} is still in DagBag. Remove the DAG file first.", 'error')
             return redirect(request.referrer)
+        except AirflowException:
+            flash(
+                f"Cannot delete DAG with id {dag_id} because some task instances of the DAG "
+                "are still running. Please mark the  task instances as "
+                "failed/succeeded before deleting the DAG",
+                "error",
+            )
+            return redirect(request.referrer)
 
         flash(f"Deleting DAG with id {dag_id}. May take a couple minutes to fully disappear.")
 
diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/experimental/test_delete_dag.py
index 7570cb8..58bcd37 100644
--- a/tests/api/common/experimental/test_delete_dag.py
+++ b/tests/api/common/experimental/test_delete_dag.py
@@ -20,14 +20,16 @@ import unittest
 
 import pytest
 
-from airflow import models
+from airflow import models, settings
 from airflow.api.common.experimental.delete_dag import delete_dag
-from airflow.exceptions import DagNotFound
+from airflow.exceptions import AirflowException, DagNotFound
 from airflow.operators.dummy import DummyOperator
+from airflow.utils import timezone
 from airflow.utils.dates import days_ago
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_dags, clear_db_runs
 
 DM = models.DagModel
 DR = models.DagRun
@@ -52,6 +54,29 @@ class TestDeleteDAGCatchError(unittest.TestCase):
             delete_dag("non-existent DAG")
 
 
+class TestDeleteDAGErrorsOnRunningTI:
+    def setup_method(self):
+        clear_db_dags()
+        clear_db_runs()
+
+    def teardown_method(self):
+        clear_db_dags()
+        clear_db_runs()
+
+    def test_delete_dag_running_taskinstances(self, create_dummy_dag):
+        dag_id = 'test-dag'
+        _, task = create_dummy_dag(dag_id)
+
+        ti = TI(task, execution_date=timezone.utcnow())
+        ti.refresh_from_db()
+        session = settings.Session()
+        ti.state = State.RUNNING
+        session.merge(ti)
+        session.commit()
+        with pytest.raises(AirflowException):
+            delete_dag(dag_id)
+
+
 class TestDeleteDAGSuccessfulDelete(unittest.TestCase):
     dag_file_path = "/usr/local/airflow/dags/test_dag_8.py"
     key = "test_dag_id"