You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/08 01:14:45 UTC

[GitHub] kaxil closed pull request #4287: [AIRFLOW-3479] Keeps records in Log Table when delete DAG & refine related tests

kaxil closed pull request #4287: [AIRFLOW-3479] Keeps records in Log Table when delete DAG & refine related tests
URL: https://github.com/apache/incubator-airflow/pull/4287
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index b9ce736b48..e7d15772c9 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -23,7 +23,15 @@
 from airflow.exceptions import DagNotFound, DagFileExists
 
 
-def delete_dag(dag_id):
+def delete_dag(dag_id, keep_records_in_log=True):
+    """
+    :param dag_id: the dag_id of the DAG to delete
+    :type dag_id: str
+    :param keep_records_in_log: whether keep records of the given dag_id
+        in the Log table in the backend database (for reasons like auditing).
+        The default value is True.
+    :type keep_records_in_log: bool
+    """
     session = settings.Session()
 
     DM = models.DagModel
@@ -41,6 +49,8 @@ def delete_dag(dag_id):
     # noinspection PyUnresolvedReferences,PyProtectedMember
     for m in models.Base._decl_class_registry.values():
         if hasattr(m, "dag_id"):
+            if keep_records_in_log and m.__name__ == 'Log':
+                continue
             cond = or_(m.dag_id == dag_id, m.dag_id.like(dag_id + ".%"))
             count += session.query(m).filter(cond).delete(synchronize_session='fetch')
 
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index b4c09de0b3..c34ec375b6 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -332,7 +332,7 @@ <h4 class="modal-title" id="dagModalLabel">
 
     function confirmDeleteDag(dag_id){
         return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
-          This option will delete ALL metadata, DAG runs, etc.\n\
+          This option will delete ALL metadata, DAG runs, etc., EXCEPT Log\n\
           This cannot be undone.");
     }
 
diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/experimental/test_delete_dag.py
new file mode 100644
index 0000000000..a012e5d3d0
--- /dev/null
+++ b/tests/api/common/experimental/test_delete_dag.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow import models
+from airflow import settings
+from airflow.api.common.experimental.delete_dag import delete_dag
+from airflow.exceptions import DagNotFound, DagFileExists
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.state import State
+
+DM = models.DagModel
+DS = models.DagStat
+DR = models.DagRun
+TI = models.TaskInstance
+LOG = models.Log
+
+
+class TestDeleteDAGCatchError(unittest.TestCase):
+
+    def setUp(self):
+        self.session = settings.Session()
+        self.dagbag = models.DagBag(include_examples=True)
+        self.dag_id = 'example_bash_operator'
+        self.dag = self.dagbag.dags[self.dag_id]
+
+    def tearDown(self):
+        self.dag.clear()
+        self.session.close()
+
+    def test_delete_dag_non_existent_dag(self):
+        with self.assertRaises(DagNotFound):
+            delete_dag("non-existent DAG")
+
+    def test_delete_dag_dag_still_in_dagbag(self):
+        models_to_check = ['DagModel', 'DagStat', 'DagRun', 'TaskInstance']
+        record_counts = {}
+
+        for model_name in models_to_check:
+            m = getattr(models, model_name)
+            record_counts[model_name] = self.session.query(m).filter(m.dag_id == self.dag_id).count()
+
+        with self.assertRaises(DagFileExists):
+            delete_dag(self.dag_id)
+
+        # No change should happen in DB
+        for model_name in models_to_check:
+            m = getattr(models, model_name)
+            self.assertEqual(
+                self.session.query(m).filter(
+                    m.dag_id == self.dag_id
+                ).count(),
+                record_counts[model_name]
+            )
+
+
+class TestDeleteDAGSuccessfulDelete(unittest.TestCase):
+
+    def setUp(self):
+        self.session = settings.Session()
+        self.key = "test_dag_id"
+
+        task = DummyOperator(task_id='dummy',
+                             dag=models.DAG(dag_id=self.key,
+                                            default_args={'start_date': days_ago(2)}),
+                             owner='airflow')
+
+        self.session.add(DM(dag_id=self.key))
+        self.session.add(DS(dag_id=self.key, state=State.SUCCESS))
+        self.session.add(DR(dag_id=self.key))
+        self.session.add(TI(task=task,
+                            execution_date=days_ago(1),
+                            state=State.SUCCESS))
+        self.session.add(LOG(dag_id=self.key, task_id=None, task_instance=None,
+                             execution_date=days_ago(1), event="varimport"))
+
+        self.session.commit()
+
+    def tearDown(self):
+        self.session.query(DM).filter(DM.dag_id == self.key).delete()
+        self.session.query(DS).filter(DS.dag_id == self.key).delete()
+        self.session.query(DR).filter(DR.dag_id == self.key).delete()
+        self.session.query(TI).filter(TI.dag_id == self.key).delete()
+        self.session.query(LOG).filter(LOG.dag_id == self.key).delete()
+        self.session.commit()
+
+        self.session.close()
+
+    def test_delete_dag_successful_delete(self):
+
+        self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 1)
+
+        delete_dag(dag_id=self.key)
+
+        self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 1)
+
+    def test_delete_dag_successful_delete_not_keeping_records_in_log(self):
+
+        self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 1)
+        self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 1)
+
+        delete_dag(dag_id=self.key, keep_records_in_log=False)
+
+        self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 0)
+        self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 0)
+
+
+if __name__ == '__main__':
+    unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services