You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/21 03:04:29 UTC

[GitHub] [airflow] SamWheating opened a new pull request #17121: [WIP] Deactivating DAGs which have been removed from files

SamWheating opened a new pull request #17121:
URL: https://github.com/apache/airflow/pull/17121


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Closes: https://github.com/apache/airflow/issues/11901
   
   Ensuring that the active DAGs in the DB are all actually present in their corresponding python files by reconciling the DB state and with the contents of a given DAG file on every parse operation. 
   
   This _should_ prevent a lot of issues encountered when writing multiple DAGs per-file, renaming DAGs or dynamically generating DAGs based on a config file read at parse-time.
   
   #### Validation:
   
   I have validated these changes in a local breeze environment with the following DAG:
   
   ```python
   from airflow.models import DAG
   from airflow import utils
   from airflow.operators.python import PythonOperator
   
   NUM_DAGS=1
   
   def message():
       print('Hello, world.')
   
   for i in range(NUM_DAGS):
       with DAG(f'dag-{i}', schedule_interval=None, start_date=utils.dates.days_ago(1)) as dag:
           task = PythonOperator(
               task_id='task',
               python_callable=message
           )
           globals()[f"dag_{i}"] = dag
   
   ```
   
   By changing the value of `NUM_DAGS` I can quickly change the number of DAG objects present in this file. 
   
   Before this change, decreasing the value of `NUM_DAGS` would leave a bunch of stale DAGs in the UI. These could be triggered but would then fail as the executor was not able to load the specified task from the file.
   
   After implementing this change, stale DAGs disappear from the UI shortly after decreasing the value of `NUM_DAGS`.
   
   (I will add some tests as well once I'm confident that this is the correct approach to fix the issue).
   
   #### Questions:
   
   1. Is there a good reason for Airflow to mark inactive DAGs as active if the file still exists? I looked through the [original PR which introduced this](https://github.com/apache/airflow/pull/5743/files) but couldn't find an explanation.
   
   2. How significant is the performance hit incurred by updating the DAG table on every parse operation?
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r673974267



##########
File path: airflow/dag_processing/processor.py
##########
@@ -647,3 +650,17 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        for dag in (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path)
+            .filter(DagModel.is_active)
+            .filter(~DagModel.dag_id.in_(dagbag.dag_ids))
+            .all()
+        ):
+            self.log.warning(
+                "Deactivating DAG ID %s since it no longer exists in file %s", dag.dag_id, file_path
+            )
+            dag.is_active = False
+            session.merge(dag)

Review comment:
       ```suggestion
       session.query(DagModel).filter(
           DagModel.fileloc == file_path,
           DagModel.is_active,
           ~DagModel.dag_id.in_(dagbag.dag_ids)
       ).update({"is_active": False})
   ```
   
   Something like this should work !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r673959161



##########
File path: airflow/dag_processing/processor.py
##########
@@ -647,3 +650,17 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        for dag in (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path)
+            .filter(DagModel.is_active)
+            .filter(~DagModel.dag_id.in_(dagbag.dag_ids))
+            .all()

Review comment:
       You can chain this filter -- they use logical AND
   
   ```suggestion
               session.query(DagModel)
               .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
               .all()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r698263618



##########
File path: tests/dag_processing/test_processor.py
##########
@@ -404,7 +404,7 @@ def test_process_file_should_failure_callback(self):
 
     @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
     def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmpdir):
-        unparseable_filename = tmpdir / TEMP_DAG_FILENAME
+        unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)

Review comment:
       `tmpdir` is not a `pathlib.Path` but a `py._path.LocalPath`, which is a path object implementation in `pytest` (actually from a package `py` that backs most of `pytest`) that pre-dates the stdlib `pathlib`.
   
   But you fix is correct—`py._path.LocalPath` implements the new [fspath protocol](https://docs.python.org/3/library/os.html#os.PathLike) introduced in Python 3.6 (PEP 519), so it can be consumed by `os.path.join` and coerced into a str.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r705882789



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       > I am touching same code, to do something similar (though not same at #18120)
   
   Interesting, I'll follow along with the conversation there as I think that there's some functional overlap with this PR, so a lot of the discussion is probably relevant here also.
   
   There's also another PR recently open which takes a different approach to the same problem as this PR (https://github.com/apache/airflow/pull/18087) We should definitely compare the different approaches and agree upon the best way to fix this issue. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r701386688



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       Ah, thanks for confirming. If its not supported in sqlite and mySQL then I don't think we can rely on this feature.
   
   We could split this into a duplicated query like:
   
   ```python
           deactivated = (
               session.query(DagModel)
               .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
           )
           if deactivated:
           	deactivated_dags = [dag.dag_id for dag in deactivated]
           	deactivated.update({DagModel.is_active: False}, synchronize_session="fetch")
           	self.log.info("Deactivated missing DAGs: %s", ",".join(deactivated_dags))
   ```
   
   But I think that will introduce potential race conditions and consistency issues, so I'd rather keep things as-is. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r705882789



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       > I am touching same code, to do something similar (though not same at #18120)
   
   Interesting, I'll follow along with that PR as well, I think that there's some overlap in the function of that PR and this one, but I think there's room for both.
   
   There's also another PR recently open which takes a different approach to the same problem (https://github.com/apache/airflow/pull/18087) We should definitely compare the different approaches and agree upon the best way to fix this issue. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r698150805



##########
File path: tests/dag_processing/test_processor.py
##########
@@ -404,7 +404,7 @@ def test_process_file_should_failure_callback(self):
 
     @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
     def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmpdir):
-        unparseable_filename = tmpdir / TEMP_DAG_FILENAME
+        unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)

Review comment:
       These changes are required because the previous `tmpdir / "child_path"` syntax would return a PathLib object rather than a string, and the `_process_file` function expects a string. 
   
   Providing the wrong type was causing some of the tests to fail due SQLAlchemy errors.
   
   `os.path.join` will always return a string and thus fixes this issue.  
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r673904932



##########
File path: airflow/dag_processing/processor.py
##########
@@ -647,3 +650,17 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        for dag in (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path)
+            .filter(DagModel.is_active)
+            .filter(~DagModel.dag_id.in_(dagbag.dag_ids))
+            .all()
+        ):
+            self.log.warning(
+                "Deactivating DAG ID %s since it no longer exists in file %s", dag.dag_id, file_path
+            )
+            dag.is_active = False
+            session.merge(dag)

Review comment:
       Performance here can be improved with `UPDATE dag SET dag.is_active = FALSE WHERE ...`. I’m not sure what the right SQLAlchemy syntax is for this though.
   
   One even significant improvement would be to somehow propagate the file paths and DAG IDs out of `DagFileProcessorProcess` and batch-upgrade all the file paths, but I don’t think that’s viable without _significant_ refactoring, so let’s not do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-903026207


   Tests are still failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r698150805



##########
File path: tests/dag_processing/test_processor.py
##########
@@ -404,7 +404,7 @@ def test_process_file_should_failure_callback(self):
 
     @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
     def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmpdir):
-        unparseable_filename = tmpdir / TEMP_DAG_FILENAME
+        unparseable_filename = os.path.join(tmpdir, TEMP_DAG_FILENAME)

Review comment:
       These changes are required because the previous `tmpdir / "child_path"` syntax would return a PathLib object rather than a string, and the `_process_file` function expects a string. 
   
   Providing the wrong type was causing some of the tests to fail due SQLAlchemy errors.
   
   Using `os.path.join` will explicitly return a string and thus fixes this issue.  
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r708739971



##########
File path: airflow/models/dag.py
##########
@@ -2656,11 +2655,6 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None):
             if dag_model.fileloc is not None:
                 if correct_maybe_zipped(dag_model.fileloc) not in alive_dag_filelocs:
                     dag_model.is_active = False
-                else:
-                    # If is_active is set as False and the DAG File still exists
-                    # Change is_active=True
-                    if not dag_model.is_active:
-                        dag_model.is_active = True

Review comment:
       Why this change? what will re-activate the DAG if it is readded




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r674009622



##########
File path: airflow/dag_processing/processor.py
##########
@@ -618,6 +619,8 @@ def process_file(
             Stats.incr('dag_file_refresh_error', 1, 1)
             return 0, 0
 
+        self._deactivate_missing_dags(session, dagbag, file_path)

Review comment:
       I think that we _do_ want to call the DB if there's no DAGs in a file - if a file used to have one DAG and now has zero then we'll need to mark that DAG is inactive, correct?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-922364612


   Kubernetes test failure are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r711407972



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       Ah thanks for the reminder - had a busy week. I'll try to get this changed and update the tests ASAP 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r705788322



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       We already do things like that :)
   
   https://github.com/apache/airflow/blob/969b239113a08f4c03ab138a0ca536ddd08a9245/airflow/jobs/scheduler_job.py#L808-L821
   
   https://github.com/apache/airflow/blob/969b239113a08f4c03ab138a0ca536ddd08a9245/airflow/jobs/scheduler_job.py#L131-L132




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r673904932



##########
File path: airflow/dag_processing/processor.py
##########
@@ -647,3 +650,17 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        for dag in (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path)
+            .filter(DagModel.is_active)
+            .filter(~DagModel.dag_id.in_(dagbag.dag_ids))
+            .all()
+        ):
+            self.log.warning(
+                "Deactivating DAG ID %s since it no longer exists in file %s", dag.dag_id, file_path
+            )
+            dag.is_active = False
+            session.merge(dag)

Review comment:
       Performance here can be significantly improved with `UPDATE dag SET dag.is_active = FALSE WHERE ...`. I’m not sure what the right SQLAlchemy syntax is for this though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908814611


   Fixing the remaining test case will have to wait until https://github.com/apache/airflow/pull/17924 is merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r708743677



##########
File path: airflow/models/dag.py
##########
@@ -2656,11 +2655,6 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None):
             if dag_model.fileloc is not None:
                 if correct_maybe_zipped(dag_model.fileloc) not in alive_dag_filelocs:
                     dag_model.is_active = False
-                else:
-                    # If is_active is set as False and the DAG File still exists
-                    # Change is_active=True
-                    if not dag_model.is_active:
-                        dag_model.is_active = True

Review comment:
       > Why this change?
   
   With this code in place, any time a DAG is no longer present in a file it will be continually reactivated, so this code needs to be removed in order to fix this issue. 
   
   I could not find a good explanation for why this was added in the first place ([here's the PR which introduced it](https://github.com/apache/airflow/pull/5743/files)), so I think its safe to remove. 
   
   > what will re-activate the DAG if it is readded
   
   I believe that the next time a DAG is found in a file, it will be marked as `active`. 
   
   https://github.com/apache/airflow/blob/c73004d0cd33d76b82b172078f572e8d4eecab39/airflow/models/dag.py#L2406




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-895287711


   @SamWheating -- I just returned from holiday, can you resolve the conflicts, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-895287711


   @SamWheating -- I just returned from holidays, can you resolve the conflicts please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r673891843



##########
File path: airflow/dag_processing/processor.py
##########
@@ -618,6 +619,8 @@ def process_file(
             Stats.incr('dag_file_refresh_error', 1, 1)
             return 0, 0
 
+        self._deactivate_missing_dags(session, dagbag, file_path)

Review comment:
       We should move this line at the end of function (or at the very least after len(dagbad.dags) check so we don't call DB if there are no dags. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r705789095



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       I am touching same code, to do something similar (though not same at https://github.com/apache/airflow/pull/18120)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r673904932



##########
File path: airflow/dag_processing/processor.py
##########
@@ -647,3 +650,17 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        for dag in (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path)
+            .filter(DagModel.is_active)
+            .filter(~DagModel.dag_id.in_(dagbag.dag_ids))
+            .all()
+        ):
+            self.log.warning(
+                "Deactivating DAG ID %s since it no longer exists in file %s", dag.dag_id, file_path
+            )
+            dag.is_active = False
+            session.merge(dag)

Review comment:
       Performance here can be significantly improved with `UPDATE dag.is_active WHERE ...`. I’m not sure what the right SQLAlchemy syntax is for this though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #17121:
URL: https://github.com/apache/airflow/pull/17121


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-905055788


   No hard deadline yet but features probably need to finish in ~2 weeks or so to be included.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating edited a comment on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating edited a comment on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908814611


   Fixing the remaining broken test case will have to wait until https://github.com/apache/airflow/pull/17924 is merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-904855730


   @SamWheating Can you take a look at failing tests please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-904960763


   I'm away on vacation for the next week - I can get these tests fixed afterwards. 
   
   Is there a deadline for the next release? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908111192


   This seems to break `test_home_importerrors_filtered_singledag_user`, I’m gussing maybe because that DAG is now disabled and thus not triggering the `ImportError`? You’ll need to investigate this and maybe fix the test (to use another DAG or something).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating edited a comment on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating edited a comment on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908506290






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r701386688



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       Ah, thanks for confirming. If its not supported in sqlite and mySQL then I don't think we can rely on this feature.
   
   We could split this into a two-part query like:
   
   ```python
           deactivated = (
               session.query(DagModel)
               .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
           )
           if deactivated:
           	deactivated_dags = [dag.dag_id for dag in deactivated]
           	deactivated.update({DagModel.is_active: False}, synchronize_session="fetch")
           	self.log.info("Deactivated missing DAGs: %s", ",".join(deactivated_dags))
   ```
   
   But I think that will introduce potential race conditions and consistency issues, so I'd rather keep things as-is. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r711414411



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       So I just went to go and make the requested changes and it doesn't look like `update().returning()` is supported in the SqlAlchemy query API but only in direct table operations.
   
   https://docs.sqlalchemy.org/en/14/core/dml.html#sqlalchemy.sql.expression.Update.returning
   
   I'm not an expert in SqlAlchemy, but it looks like we use the ORM-friendly `session.query().update()` methods everywhere, in which case we can't use `.returning()`
   
   Does my understanding sound correct? If so then we'll have to just mark this one as resolved.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r700963529



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       I wonder if it’d be worthwhile to log the DAG IDs here. (Is it easy though? I know in SQL you can do `UPDATE ... RETURNING` but I don’t know what dbs can handle that nor how it’s done in SQLAlchemy.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-909555444


   I don't think that the remaining failure is related to my changes 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r705882789



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       > I am touching same code, to do something similar (though not same at #18120)
   
   Interesting, I'll follow along with that PR as well, I think that there's some overlap with this one, but thats alright.
   
   There's also another PR recently open which takes a different approach to the same problem (https://github.com/apache/airflow/pull/18087) We should definitely compare the different approaches and agree upon the best way to fix this issue. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-889549280


   Can you rebase on latest main please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-902340103


   One of the tests is failing, can you take a look please: https://github.com/apache/airflow/pull/17121/checks?check_run_id=3284172968#step:6:10746


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r701386688



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       Ah, thanks for confirming. If its not supported in sqlite and mySQL then I don't think we can rely on this feature.
   
   We could split this into a separated query + update like:
   
   ```python
           deactivated = (
               session.query(DagModel)
               .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
           )
           if deactivated:
           	deactivated_dags = [dag.dag_id for dag in deactivated]
           	deactivated.update({DagModel.is_active: False}, synchronize_session="fetch")
           	self.log.info("Deactivated missing DAGs: %s", ",".join(deactivated_dags))
   ```
   
   But I think that will introduce potential race conditions and consistency issues, so I'd rather keep things as-is. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908111192


   This seems to break `test_home_importerrors_filtered_singledag_user`, I’m gussing maybe because that DAG is now disabled and thus not triggering the `ImportError`? You’ll need to investigate this and maybe fix the test (to use another DAG or something).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-911532511


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r674205923



##########
File path: airflow/dag_processing/processor.py
##########
@@ -618,6 +619,8 @@ def process_file(
             Stats.incr('dag_file_refresh_error', 1, 1)
             return 0, 0
 
+        self._deactivate_missing_dags(session, dagbag, file_path)

Review comment:
       hmm yes True




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r708745286



##########
File path: airflow/models/dag.py
##########
@@ -2656,11 +2655,6 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None):
             if dag_model.fileloc is not None:
                 if correct_maybe_zipped(dag_model.fileloc) not in alive_dag_filelocs:
                     dag_model.is_active = False
-                else:
-                    # If is_active is set as False and the DAG File still exists
-                    # Change is_active=True
-                    if not dag_model.is_active:
-                        dag_model.is_active = True

Review comment:
       Yup, make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r708743677



##########
File path: airflow/models/dag.py
##########
@@ -2656,11 +2655,6 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None):
             if dag_model.fileloc is not None:
                 if correct_maybe_zipped(dag_model.fileloc) not in alive_dag_filelocs:
                     dag_model.is_active = False
-                else:
-                    # If is_active is set as False and the DAG File still exists
-                    # Change is_active=True
-                    if not dag_model.is_active:
-                        dag_model.is_active = True

Review comment:
       > Why this change?
   
   With this code in place, any time a DAG is no longer present in a file it will be continually reactivated, so this code needs to be removed in order to fix this issue. 
   
   I could not find a good explanation for why this was added in the first place ([here's the PR which introduced it](https://github.com/apache/airflow/pull/5743/files)), so I think its safe to remove. 
   
   > what will re-activate the DAG if it is readded
   
   I believe that the next time a DAG is found in a file, it will be marked as `active` when syncing the DAGs from the file processor to the DB.  
   
   https://github.com/apache/airflow/blob/c73004d0cd33d76b82b172078f572e8d4eecab39/airflow/models/dag.py#L2406




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908506290






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-902959117


   > One of the tests is failing, can you take a look please: [#17121 (checks)](https://github.com/apache/airflow/pull/17121/checks?check_run_id=3284172968#step:6:10746)
   
   I rebased my branch and have been unable to recreate this failure locally - If this failure persists in CI after a rebase then I can investigate further 👀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r704563433



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       > Or we can only show the IDs on Postgres and MSSQL.
   
   Do you think its worth doing that? I feel like its a bit of an anti-pattern to have code like this around:
   ```python
   if conf.get('core', 'sql_alchemy_conn').startswith(('mssql', 'postgresql')):
   	# run one query
   else:
   	# run a different query
   ```
   
   So I'd be inclined to just keep it as-is (until someone complains). Thoughts?
   	




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r705877189



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       > We already do things like that :)
   
   Neat! Thanks for the examples - I can refactor this PR tomorrow to use different queries and logging depending on the dialect




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r701344776



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       Thats a great idea - It looks like its supported:
   https://docs.sqlalchemy.org/en/14/core/dml.html#sqlalchemy.sql.expression.Update.returning
   
   But I'm not sure if this also means that all dialects are supported. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] siddharthvp commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
siddharthvp commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r701382534



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       From https://docs.sqlalchemy.org/en/14/glossary.html#term-RETURNING:
   > The backends that currently support RETURNING or a similar construct are PostgreSQL, SQL Server, Oracle, and Firebird. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r701554837



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       Or we can only show the IDs on Postgres and MSSQL. Or do that some other time when someone complains 😛 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908506290


   Yeah, this might require some more changes, here's a shortened version of the relevant code from the rendering of the home page:
   
   ```python 
   
   		dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)    # deactivated DAGs filtered out here
           import_errors = session.query(errors.ImportError).order_by(errors.ImportError.id).all()
   		all_dags = dags_query
           current_dags = all_dags
   
           dags = (
               current_dags.order_by(DagModel.dag_id)
               .options(joinedload(DagModel.tags))
               .offset(start)
               .limit(dags_per_page)
               .all()
           )
   
           if import_errors:
               dag_filenames = {dag.fileloc for dag in dags}
               all_dags_readable = (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG) in user_permissions
   
               for import_error in import_errors:
                   if all_dags_readable or import_error.filename in dag_filenames:		# dag_filenames only includes active DAGs
                       flash(
                           "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=import_error),
                           "dag_import_error",
                       )
   ```
   [source](https://github.com/apache/airflow/blob/e18b6a6d19f9ea0d8fe760ba00adf38810f0e510/airflow/www/views.py#L541)
   
   So if a user does not have access to read all DAGs, they won't see importErrors for any DAGs since a DAG which begins throwing importErrors will be marked inactive 🤔 
   
   I guess the fix here would be to populate `dag_filenames` from a separate query which includes deactivated DAGs? Let me know if you have any suggestions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r708745692



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       This one is pending or else looks good and ready to be merged




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-903026207


   Tests are still faining


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908111192


   This seems to break `test_home_importerrors_filtered_singledag_user`, I’m gussing maybe because that DAG is not disabled and thus not triggering the `ImportError`? You’ll need to investigate this and maybe fix the test (to use another DAG or something).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17121: Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#discussion_r704563433



##########
File path: airflow/dag_processing/processor.py
##########
@@ -645,3 +648,12 @@ def process_file(
             self.log.exception("Error logging import errors!")
 
         return len(dagbag.dags), len(dagbag.import_errors)
+
+    def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
+        deactivated = (
+            session.query(DagModel)
+            .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
+            .update({DagModel.is_active: False}, synchronize_session="fetch")
+        )
+        if deactivated:
+            self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)

Review comment:
       > Or we can only show the IDs on Postgres and MSSQL.
   
   Do you think its worth doing that? I feel like its a bit of an anti-pattern to have code like this around:
   ```python
   if conf.get('core', 'sql_alchemy_conn').startswith(('mssql', 'postgresql')):
   	# run one query
   else:
   	# run a different query
   ```
   
   So I'd be inclined to just keep it as-is. Thoughts?
   	




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating edited a comment on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating edited a comment on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-908506290


   Yeah exactly, this might require some more changes, here's a shortened version of the relevant code from the rendering of the home page:
   
   ```python 
   
   		dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)    # deactivated DAGs filtered out here
           import_errors = session.query(errors.ImportError).order_by(errors.ImportError.id).all()
   		all_dags = dags_query
           current_dags = all_dags
   
           dags = (
               current_dags.order_by(DagModel.dag_id)
               .options(joinedload(DagModel.tags))
               .offset(start)
               .limit(dags_per_page)
               .all()
           )
   
           if import_errors:
               dag_filenames = {dag.fileloc for dag in dags}
               all_dags_readable = (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG) in user_permissions
   
               for import_error in import_errors:
                   if all_dags_readable or import_error.filename in dag_filenames:		# dag_filenames only includes active DAGs
                       flash(
                           "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=import_error),
                           "dag_import_error",
                       )
   ```
   [source](https://github.com/apache/airflow/blob/e18b6a6d19f9ea0d8fe760ba00adf38810f0e510/airflow/www/views.py#L541)
   
   So if a user does not have access to read all DAGs, they won't see importErrors for any DAGs since a DAG which begins throwing importErrors will be marked inactive 🤔 This definitely isn't the desired behaviour. 
   
   I guess the fix here would be to populate `dag_filenames` from a separate query which includes deactivated DAGs? Let me know if you have any suggestions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on pull request #17121: [WIP] Deactivating DAGs which have been removed from files

Posted by GitBox <gi...@apache.org>.
SamWheating commented on pull request #17121:
URL: https://github.com/apache/airflow/pull/17121#issuecomment-909555444


   I don't think that the remaining failure is related to my changes 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org