You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/09 11:57:26 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #16182: Do not queue tasks when the DAG file goes missing

ephraimbuddy opened a new pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   Currently, if a dag goes missing, the scheduler continues to queue the task instances
   until the executor reports the tasks as failed and then the scheduler would now set the state to failed.
   
   This change ensures that tasks are not queued when the dag file goes missing. Instead of waiting on the
   executor to fail the task without explicit reason, the task fails here with the reason why it failed. Thanks to
   this, the Pool's queued slots will be freed for other tasks to be queued
   
   Closes: https://github.com/apache/airflow/issues/15488
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643681695



##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: fail_tis_with_missing_dag_file_interval

Review comment:
       I can’t  think of a better name here...please suggest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r645497177



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1457,6 +1460,31 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = [State.SCHEDULED, State.QUEUED, State.RUNNING]

Review comment:
       Might be able to use `State.unfinished - None - SHUTDOWN` probably




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657505551



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the missing_dag_cleanup_interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        executor.queued_tasks

Review comment:
       It's actually needed to simulate the executor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663752836



##########
File path: airflow/config_templates/config.yml
##########
@@ -1709,6 +1709,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: clean_tis_without_dag_interval
+      description: |
+        How often (in seconds) to check and fail task instances and dagruns whose corresponding
+        DAG is missing
+      version_added: 2.1.1

Review comment:
       ```suggestion
         version_added: 2.1.2
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _clean_tis_without_dag(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dagbag/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue
+        if missing_dags:
+            self.log.warning(
+                "The following Dags are missing, therefore the DAG's "
+                "task instances have been set to failed: \t\n%s",
+                [
+                    (f"Missing DAGs: {dag_name}", f"Failed TaskInstances: [{ti}]")
+                    for dag_name, ti in missing_dags.items()
+                ],
+            )
+            self.log.warning("Failing the corresponding DagRuns of the missing DAGs. DagRuns: %s", dag_runs)
+            for dr in dag_runs:
+                dr.set_state(State.FAILED)

Review comment:
       ```suggestion
                   dr.state = State.FAILED
   ```

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+            dag_id=dag_id,
+            session=session,
+        )
+        sd_has_dag = sd_last_updated_datetime is not None
+        if dag_id not in self.dags:
+            return sd_has_dag
+        if dag_id not in self.dags_last_fetched:
+            return sd_has_dag
+        min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+        if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
+            return sd_has_dag
+        if sd_has_dag:
+            return True

Review comment:
       We should refactor this to delay the DB check until we need it -- for instance if we have the dag locally, and it was fetched less than the configured timeout already, then we don't need to ask the DB

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+            dag_id=dag_id,
+            session=session,
+        )
+        sd_has_dag = sd_last_updated_datetime is not None
+        if dag_id not in self.dags:
+            return sd_has_dag
+        if dag_id not in self.dags_last_fetched:
+            return sd_has_dag
+        min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+        if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
+            return sd_has_dag
+        if sd_has_dag:
+            return True

Review comment:
       So something like this (pseudo-python):
   
   ```python
           if dag_id in self.dags and timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
               return True
   
           sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
               dag_id=dag_id,
               session=session,
           )
           # etc ...
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _clean_tis_without_dag(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dagbag/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)

Review comment:
       I wonder if this (and L823) should be set to State.REMOVED? It might be clearer for debugging for the user than a failure without any logs.

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)

Review comment:
       We should put a query count assertion around this line to ensure it is 0

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)
+            assert not dag_bag.has_dag("non_added_dag_id")
+
+        # Check that min_serialized_dag_fetch_interval has passed but
+        # dag is in SerializedDagModel and we return True
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
+            assert dag_bag.has_dag(dag_id)
+
+        # We remove the dag from dag_bag.dags and ensure it returns True
+        dag_bag.dags.pop(dag_id)
+        assert not dag_bag.dags.get(dag_id)

Review comment:
       ```suggestion
   ```
   
   This line was just testing python core semantics, no need to do it in our unit tests :) 

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1475,6 +1475,55 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        dagbag.has_dag = mock.MagicMock(return_value=False)
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the clean_tis_without_dag interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        self.scheduler_job.executor = executor
+        processor = mock.MagicMock()
+        processor.done = False
+        self.scheduler_job.processor_agent = processor
+
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
+            {('scheduler', 'clean_tis_without_dag_interval'): '0.001'}
+        ):
+            self.scheduler_job._run_scheduler_loop()

Review comment:
       Not sure we need to run the whole scheduler loop here -- we could just call `self.scheduler_job._clean_tis_without_dag()` directly.
   
   By calling scheduler_loop the only thing extra we check is that we've added this to the timer, but we can see that pretty easily.
   
   Dunno :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649175016



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,36 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if dag_id in self.dags_last_fetched:
+
+                return timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs

Review comment:
       How do you suggest I resolve this? My thinking was that once the dag exists in cache, it'll still exist in dags_last_fetched?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649563576



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,42 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]

Review comment:
       Do we need to store `ti` here?
   
   `missing_dags` can be just a set
   
   ```python
   missing_dags = set()
   ```
   
   and then this line can be:
   
   ```suggestion
                   missing_dags.add(ti.dag_id)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649073919



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,36 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if dag_id in self.dags_last_fetched:
+
+                return timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs

Review comment:
       This is the wrong behaviour -- if the dag still exists, but it just hasn't been fetched in this DagBag instance it would return false.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r656498989



##########
File path: airflow/config_templates/default_airflow.cfg
##########
@@ -849,6 +849,10 @@ job_heartbeat_sec = 5
 # that no longer have a matching DagRun
 clean_tis_without_dagrun_interval = 15.0
 
+# How often (in seconds) to check and fail tis and dagruns whose corresponding

Review comment:
       ```suggestion
   # How often (in seconds) to check and fail task instances and dagruns whose corresponding
   ```

##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: missing_dag_cleanup_interval
+      description: |
+        How often (in seconds) to check and fail tis and dagruns whose corresponding

Review comment:
       ```suggestion
           How often (in seconds) to check and fail task instances and dagruns whose corresponding
   ```

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]

Review comment:
       Why do we want this case?
   
   This means if we have a sort of stale DAG it returns false.
   
   

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state

Review comment:
       DagRun is not created with Failed state, comment might need updating

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False

Review comment:
       ```suggestion
                       self.log.warning("Serialized DAG %s no longer exists", dag_id)
                       del self.dags[dag_id]
                       del self.dags_last_fetched[dag_id]
                       del self.dags_hash[dag_id]
                       return False
   ```

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)
+            assert not dag_bag.has_dag("non_added_dag_id")
+
+        # Check that min_serialized_dag_fetch_interval has passed but
+        # dag is in SerializedDagModel and we return True
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
+            assert dag_bag.has_dag(dag_id)

Review comment:
       `min_serialized_dag_fetch_interval` hasn't passed though, it is just been 4 seconds but the `min_serialized_dag_fetch_interval` is set to 5 seconds, no?

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False

Review comment:
       Removes dags as it no longer exists! so that next time we return False faster as DAG won't be in the DagBag

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the missing_dag_cleanup_interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        executor.queued_tasks

Review comment:
       no-op ! Not needed ?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that

Review comment:
       ```suggestion
           # This poll interval is large, but the scheduler doesn't sleep that
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag

Review comment:
       ```suggestion
           dagbag.has_dag = mock.MagicMock(return_value=False)
   ```
   
   as `mock_has_dag` isn't used anywhere else

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")

Review comment:
       ```suggestion
               example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
   ```

##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: missing_dag_cleanup_interval
+      description: |
+        How often (in seconds) to check and fail tis and dagruns whose corresponding

Review comment:
       How about``clean_tis_without_dags`` instead to make it similar to `clean_tis_without_dagrun_interval` ??
   
   No strong opinion though

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]
+
+            return SerializedDagModel.has_dag(dag_id=dag_id, session=session)

Review comment:
       ```suggestion
   ```
   
   This is sort of redundant, as if function has not returned yet, the line below will be executed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643122857



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue
+            if os.path.exists(dag.fileloc):
+                tis.append(ti)
+            else:
+                dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                if ti.state not in State.finished:
+                    ti.set_state(State.FAILED, session=session)

Review comment:
       There is a "removed" state that would perhaps be more appropriate here.
   
   (Though the removed state is not in the `State.finished`, and probably should be.)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue

Review comment:
       Loading the full dag is an expensive operation -- nothing in here actually needs the DAG object, so I think this calls for a new method on the DagBag `has_dag` -- that first checks if the dag exist in it's local cache, and then if not checks for the DB row existing, but crucially, it doesn't actually load the full row!

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue
+            if os.path.exists(dag.fileloc):
+                tis.append(ti)
+            else:
+                dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)

Review comment:
       No need to use the dag object here.
   
   ```suggestion
                   dagrun = ti.get_dagrun(session)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.

Review comment:
       ```suggestion
   
           Return task instances that exists in SerializedDag table as well as dags folder.
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue
+            if os.path.exists(dag.fileloc):
+                tis.append(ti)
+            else:
+                dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                if ti.state not in State.finished:
+                    ti.set_state(State.FAILED, session=session)
+                    self.log.error("Failing task: %s because DAG: %s is missing", ti.task_id, ti.dag_id)
+                if dagrun.state not in State.finished:
+                    dagrun.set_state(State.FAILED)

Review comment:
       If the final tasks in a DagRun are still actually running, it probably doesn't make sense to instantly fail this, as this run could still end up completing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#issuecomment-857926406


   @ashb can you take another look and sign off if your requested changes have been sufficiently addressed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643680117



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []

Review comment:
       Thanks @uranusjr, I have changed this implementation to run at regular intervals. Please review




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#issuecomment-874569681


   Hi @ashb @kaxil, I'm closing this as I can't reproduce this scenario again manually.
   This PR: https://github.com/apache/airflow/pull/16368 easily handles missing dag file and any dag that's missing is immediately failed in executor and subsequently by this PR: https://github.com/apache/airflow/pull/15929 in scheduler. 
   
   Unlike before, I tried severally to reproduce this case in manual testing but the two PRs above are hit thus not being able to reproduce it.
   
   I'm closing now as I believe it has been solved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r646414422



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue

Review comment:
       > Loading the dag is an "expensive" operation, so this should be reworked to add a new method on dagbag: `has_dag` that does the following:
   > 
   > * Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
   > * Else check in the DB -- but crucially, only check for the row existing, don't load the full DagModel object, and crucially don't load the SerializedDag.
   
   I thought the `self.dagbag.dags` is a dictionary having all already processed dags? That's we are not querying the DB this time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r642681719



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,32 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    # pylint: disable=too-many-nested-blocks
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+                if os.path.exists(dag.fileloc):
+                    tis.append(ti)
+                else:
+                    dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                    if ti.state not in State.finished:
+                        ti.set_state(State.FAILED, session=session)
+                        self.log.error("Failing task: %s because DAG: %s is missing", ti.task_id, ti.dag_id)
+                    if dagrun.state not in State.finished:
+                        dagrun.set_state(State.FAILED)
+            except SerializedDagNotFound:
+                tis.append(ti)
+        return tis

Review comment:
       Ok. I will try that.
   The `SerializedDagNotFound` is raised at `dag = self.dagbag.get_dag(ti.dag_id, session=session)`. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649173062



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,42 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue
+            # Dag file no longer exists?
+            if not os.path.exists(ti.dag_model.fileloc):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]

Review comment:
       I added this after I manually removed dag files of queued dags which got stuck. On testing again, the dag got stuck, executor detects it and fail the task which scheduler fails too, so I will remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663758644



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+            dag_id=dag_id,
+            session=session,
+        )
+        sd_has_dag = sd_last_updated_datetime is not None
+        if dag_id not in self.dags:
+            return sd_has_dag
+        if dag_id not in self.dags_last_fetched:
+            return sd_has_dag
+        min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+        if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
+            return sd_has_dag
+        if sd_has_dag:
+            return True

Review comment:
       We should refactor this to delay the DB check until we need it -- for instance if we have the dag locally, and it was fetched less than the configured timeout already, then we don't need to ask the DB




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r659635689



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,33 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    self.log.warning("Serialized DAG %s no longer exists", dag_id)
+                    del self.dags[dag_id]
+                    del self.dags_last_fetched[dag_id]
+                    del self.dags_hash[dag_id]
+                    return False
+                return True
+        return SerializedDagModel.has_dag(dag_id=dag_id, session=session)

Review comment:
       ```python
           sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
               dag_id=dag_id,
               session=session,
           )
           sd_has_dag = (sd_last_updated_datetime is not None)
           if dag_id not in self.dags:
               return sd_has_dag
           if dag_id not in self.dags_last_fetched:
               return sd_has_dag
           min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
           if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
               return sd_has_dag
           if sd_has_dag:
               return True
           self.log.warning("Serialized DAG %s no longer exists", dag_id)
           del self.dags[dag_id]
           del self.dags_last_fetched[dag_id]
           del self.dags_hash[dag_id]
           return False
   ```
   
   `SerializedDagModel.last_updated` is not nullable, so if `SerializedDagModel.get_last_updated_datetime()` returns None, it means the dag does not have a corresponding row. This saves us one database query. Since we can calculate the `SerializedDagModel` row’s existence from that, early returns can be used to make the logic easier to follow.
   
   (Note: I wrote the code block directly in the text field, so please double check to make sure the code is correct.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657485367



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]

Review comment:
       Oops, didn't know the sd means stale dag :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643214395



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.

Review comment:
       Thanks for the reviews. I will work out a similar thing like `SchedulerJob._clean_tis_without_dagrun` and see how it works




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r660080989



##########
File path: airflow/config_templates/config.yml
##########
@@ -1709,6 +1709,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: clean_tis_without_dag

Review comment:
       Yep!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657484800



##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: missing_dag_cleanup_interval
+      description: |
+        How often (in seconds) to check and fail tis and dagruns whose corresponding

Review comment:
       Makes sense. Will do that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657485367



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]

Review comment:
       Oops, didn't know the sd means stale dag :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657489900



##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)
+            assert not dag_bag.has_dag("non_added_dag_id")
+
+        # Check that min_serialized_dag_fetch_interval has passed but
+        # dag is in SerializedDagModel and we return True
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
+            assert dag_bag.has_dag(dag_id)

Review comment:
       Correct!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r646495372



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue

Review comment:
       `dagbag.dags` is effectively a cache form the DB -- it only contains dags that _this process_ has accessed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r642684378



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,32 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    # pylint: disable=too-many-nested-blocks
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+                if os.path.exists(dag.fileloc):
+                    tis.append(ti)
+                else:
+                    dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                    if ti.state not in State.finished:
+                        ti.set_state(State.FAILED, session=session)
+                        self.log.error("Failing task: %s because DAG: %s is missing", ti.task_id, ti.dag_id)
+                    if dagrun.state not in State.finished:
+                        dagrun.set_state(State.FAILED)
+            except SerializedDagNotFound:
+                tis.append(ti)
+        return tis

Review comment:
       @uranusjr Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r652433551



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1418,6 +1421,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649116906



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,42 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue
+            # Dag file no longer exists?
+            if not os.path.exists(ti.dag_model.fileloc):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]

Review comment:
       No, not yet atleast. Will cover it as part of https://github.com/apache/airflow/pull/16368 or in a separate PR.
   
   On Webserver we rely on `worker_refresh_interval` as `cache buster`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643951034



##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: fail_tis_with_missing_dag_file_interval

Review comment:
       Maybe something like `missing_dag_file_cleanup_interval`? We are basically cleaning those DAGs up, and failing tasks is a result of that (because the task can no longer be run).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649337490



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,36 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if dag_id in self.dags_last_fetched:
+
+                return timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs

Review comment:
       Looking at how that is used, nothing in the DagBag deals with the case where a dagmodel/serialized dag row is deleted -- and the cached version would be returned, or maybe an error would be thrown.
   
   EIther way, it doesn't really handle this case form what I can tell.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#issuecomment-874569681


   Hi @ashb @kaxil, I'm closing this as I can't reproduce this scenario again manually.
   This PR: https://github.com/apache/airflow/pull/16368 easily handles missing dag file and any dag that's missing is immediately failed in executor and subsequently by this PR: https://github.com/apache/airflow/pull/15929 in scheduler. 
   
   Unlike before, I tried severally to reproduce this case in manual testing but the two PRs above are hit thus not being able to reproduce it.
   
   I'm closing now as I believe it has been solved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r646414422



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue

Review comment:
       > Loading the dag is an "expensive" operation, so this should be reworked to add a new method on dagbag: `has_dag` that does the following:
   > 
   > * Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
   > * Else check in the DB -- but crucially, only check for the row existing, don't load the full DagModel object, and crucially don't load the SerializedDag.
   
   I thought the `self.dagbag.dags` is a dictionary having all already processed dags?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r659596833



##########
File path: airflow/config_templates/config.yml
##########
@@ -1709,6 +1709,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: clean_tis_without_dag

Review comment:
       Should we add a `sec` or `interval` suffix like other configs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663764180



##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)
+            assert not dag_bag.has_dag("non_added_dag_id")
+
+        # Check that min_serialized_dag_fetch_interval has passed but
+        # dag is in SerializedDagModel and we return True
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
+            assert dag_bag.has_dag(dag_id)
+
+        # We remove the dag from dag_bag.dags and ensure it returns True
+        dag_bag.dags.pop(dag_id)
+        assert not dag_bag.dags.get(dag_id)

Review comment:
       ```suggestion
   ```
   
   This line was just testing python core semantics, no need to do it in our unit tests :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663765528



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1475,6 +1475,55 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        dagbag.has_dag = mock.MagicMock(return_value=False)
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the clean_tis_without_dag interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        self.scheduler_job.executor = executor
+        processor = mock.MagicMock()
+        processor.done = False
+        self.scheduler_job.processor_agent = processor
+
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
+            {('scheduler', 'clean_tis_without_dag_interval'): '0.001'}
+        ):
+            self.scheduler_job._run_scheduler_loop()

Review comment:
       Not sure we need to run the whole scheduler loop here -- we could just call `self.scheduler_job._clean_tis_without_dag()` directly.
   
   By calling scheduler_loop the only thing extra we check is that we've added this to the timer, but we can see that pretty easily.
   
   Dunno :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657523165



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the missing_dag_cleanup_interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        executor.queued_tasks

Review comment:
       It is just a dictionary, it does nothing i.e. statement has no effect




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#issuecomment-862131185


   @kaxil, @ashb please take a look. I have updated the `has_dag` method


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657498579



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]

Review comment:
       I see. No need. I will just return True




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643681695



##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: fail_tis_with_missing_dag_file_interval

Review comment:
       I couldn't think of a better name here...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil closed pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil closed pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657525376



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the missing_dag_cleanup_interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        executor.queued_tasks

Review comment:
       Oh, I was thinking it's both with the executor=MockExecutor. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r646436504



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue

Review comment:
       Got it now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649073075



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,42 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue
+            # Dag file no longer exists?
+            if not os.path.exists(ti.dag_model.fileloc):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]

Review comment:
       I don't think this check is necessary given if the file doesn't exist it will (eventually, after a little while) no longer be in the dag.
   
   @kaxil do we ever remove dags from the DagBag "cache"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663752836



##########
File path: airflow/config_templates/config.yml
##########
@@ -1709,6 +1709,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: clean_tis_without_dag_interval
+      description: |
+        How often (in seconds) to check and fail task instances and dagruns whose corresponding
+        DAG is missing
+      version_added: 2.1.1

Review comment:
       ```suggestion
         version_added: 2.1.2
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663760971



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+            dag_id=dag_id,
+            session=session,
+        )
+        sd_has_dag = sd_last_updated_datetime is not None
+        if dag_id not in self.dags:
+            return sd_has_dag
+        if dag_id not in self.dags_last_fetched:
+            return sd_has_dag
+        min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+        if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
+            return sd_has_dag
+        if sd_has_dag:
+            return True

Review comment:
       So something like this (pseudo-python):
   
   ```python
           if dag_id in self.dags and timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
               return True
   
           sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
               dag_id=dag_id,
               session=session,
           )
           # etc ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r642676994



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,32 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    # pylint: disable=too-many-nested-blocks
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+                if os.path.exists(dag.fileloc):
+                    tis.append(ti)
+                else:
+                    dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                    if ti.state not in State.finished:
+                        ti.set_state(State.FAILED, session=session)
+                        self.log.error("Failing task: %s because DAG: %s is missing", ti.task_id, ti.dag_id)
+                    if dagrun.state not in State.finished:
+                        dagrun.set_state(State.FAILED)
+            except SerializedDagNotFound:
+                tis.append(ti)
+        return tis

Review comment:
       Pylint is sort of right here, the function could benefit from some refactoring with `continue` and more fine-grained try-except (when is `SerializedDagNotFound` raised?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643243739



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []

Review comment:
       This is the list of TaskInstances that *will* be queued, right? Could use a better variable name (and a `:returns:` line in the docstring).
   
   I may also turn this into an iterator or even filter function instead to get rid of this local list, so at the call site we can do
   
   ```python
   task_instances_to_examine = list(
       self._filter_task_instances_to_examine(
           task_instances_to_examine, session=session,
       )  # This is a generator that yields each ti.
   )
   ```
   
   or
   
   ```python
   task_instances_to_examine = [
       ti for ti in task_instances_to_examine
       if self._mark_task_instance_should_be_examined(ti, session=session)
   ]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657501138



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]
+
+            return SerializedDagModel.has_dag(dag_id=dag_id, session=session)

Review comment:
       It seems like this is needed for a case where the dag is in `self.dags` but not in `self.dags_last_fetched`. Is it possible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r660080989



##########
File path: airflow/config_templates/config.yml
##########
@@ -1709,6 +1709,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: clean_tis_without_dag

Review comment:
       Yep!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663763775



##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)

Review comment:
       We should put a query count assertion around this line to ensure it is 0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649564351



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,42 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue
+            # Dag file no longer exists?
+            if not os.path.exists(ti.dag_model.fileloc):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+        if missing_dags:
+            self.log.warning(
+                "The following Dags are missing, therefore the DAG's "
+                "task instances have been failed: \t\n%s",

Review comment:
       ```suggestion
                   "task instances have been set to failed: \t\n%s",
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16182:
URL: https://github.com/apache/airflow/pull/16182


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r647021778



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue
+            if not os.path.exists(dag.fileloc):

Review comment:
       I have updated this and tested by renaming dags in their files in which case the dag file exists but the dag itself no longer exists. The result was that the executor sees immediately that the dag_id cannot be found and errors. Then reports to the scheduler that the task instance has failed. Here immediately, the task instance is failed in the scheduler.  
   
   When the time for running this change reaches and there are still task instances not existing anymore which are not failed, it fails them effectively.
   
   Check if the implementation is alright. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663762422



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _clean_tis_without_dag(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dagbag/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)

Review comment:
       I wonder if this (and L823) should be set to State.REMOVED? It might be clearer for debugging for the user than a failure without any logs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649565887



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,36 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if dag_id in self.dags_last_fetched:
+
+                return timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs

Review comment:
       Once https://github.com/apache/airflow/pull/16368 is merged, you can just call `get_dag(dag_id)`, if it returns `None`, you can return False




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649335793



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,36 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if dag_id in self.dags_last_fetched:
+
+                return timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs

Review comment:
       paging @kaxil 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649565887



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,36 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if dag_id in self.dags_last_fetched:
+
+                return timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs

Review comment:
       Once https://github.com/apache/airflow/pull/16368 is merged, you can just call `get_dag(dag_id)`, if it returns `None`, you can return False.
   
   Or separate the code from `get_dag` that just checks if dag is in the dagbag and if it exists, check those 3 conditions and return None or True from that function.
   
   Can explain tomorrow




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r645497017



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1457,6 +1460,31 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = [State.SCHEDULED, State.QUEUED, State.RUNNING]

Review comment:
       Let's add `UP_FOR_RETRY`, `SENSING`, `UP_FOR_RESCHEDULE` too , WDYT ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r649563576



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,42 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]

Review comment:
       Does we need to store `ti` here?
   
   `missing_dags` can be just a set
   
   ```python
   missing_dags = set()
   ```
   
   and then this line can be:
   
   ```suggestion
                   missing_dags.add(ti.dag_id)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r659635689



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,33 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    self.log.warning("Serialized DAG %s no longer exists", dag_id)
+                    del self.dags[dag_id]
+                    del self.dags_last_fetched[dag_id]
+                    del self.dags_hash[dag_id]
+                    return False
+                return True
+        return SerializedDagModel.has_dag(dag_id=dag_id, session=session)

Review comment:
       ```suggestion
           sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
               dag_id=dag_id,
               session=session,
           )
           sd_has_dag = (sd_last_updated_datetime is not None)
           if dag_id not in self.dags:
               return sd_has_dag
           if dag_id not in self.dags_last_fetched:
               return sd_has_dag
           min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
           if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
               return sd_has_dag
           if sd_has_dag:
               return True
           self.log.warning("Serialized DAG %s no longer exists", dag_id)
           del self.dags[dag_id]
           del self.dags_last_fetched[dag_id]
           del self.dags_hash[dag_id]
           return False
   ```
   
   `SerializedDagModel.last_updated` is not nullable, so if `SerializedDagModel.get_last_updated_datetime()` returns None, it means the dag does not have a corresponding row. This saves us one database query. Since we can calculate the `SerializedDagModel` row’s existence from that, early returns can be used to make the logic easier to follow.
   
   (Note: I wrote the suggestion block directly in the text field, so please double check to make sure the code is correct.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663757194



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _clean_tis_without_dag(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dagbag/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue
+        if missing_dags:
+            self.log.warning(
+                "The following Dags are missing, therefore the DAG's "
+                "task instances have been set to failed: \t\n%s",
+                [
+                    (f"Missing DAGs: {dag_name}", f"Failed TaskInstances: [{ti}]")
+                    for dag_name, ti in missing_dags.items()
+                ],
+            )
+            self.log.warning("Failing the corresponding DagRuns of the missing DAGs. DagRuns: %s", dag_runs)
+            for dr in dag_runs:
+                dr.set_state(State.FAILED)

Review comment:
       ```suggestion
                   dr.state = State.FAILED
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643952249



##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: fail_tis_with_missing_dag_file_interval

Review comment:
       Apt!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r646408397



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue

Review comment:
       Loading the dag is an "expensive" operation, so this should be reworked to add a new method on dagbag: `has_dag` that does the following:
   
   - Checks the "local" cache, if it exists, and is not older than the confiured cache time, return True
   - Else check in the DB -- but crucially, only check for the row existing, don't load the full DagModel object, and crucially don't load the SerializedDag.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue
+            if not os.path.exists(dag.fileloc):

Review comment:
       This isn't going to work for the case where the _file_ still exists, but that file no longer defines this DAG -- i.e. where you have more than one dag in a file and you delete one of them.
   
   To fix this properly we might have to fix https://github.com/apache/airflow/pull/11462 correctly.
   

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1417,6 +1420,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _missing_dag_file_cleanup(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in the dag folder"""
+        states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+        for ti in tis:
+            dag = self.dagbag.dags.get(ti.dag_id, None)
+            if not dag:
+                continue

Review comment:
       And an optimization: if the dag is already in `missing_dags`, we don't need to check again here :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org