You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/05 12:17:45 UTC

[GitHub] [airflow] ashb opened a new pull request, #25550: Ensure that zombie tasks for dags with errors get cleaned up

ashb opened a new pull request, #25550:
URL: https://github.com/apache/airflow/pull/25550

   If there is a parse error in a DAG the zombie cleanup request never ran,
   which resulted in the TI never leaving running state and just
   continually being detected as a zombie.
   
   (Prior to AIP-45 landing, this bug/behaviour resulted in a DAG with a
   parse error never actually leaving the queued state.)
   
   The fix here is to _always_ make sure we run `ti.handle_failure` when we
   are given a request, even if we can't load the DAG. To _try_ and work as
   well as we can, we try to load the serialized_dag if we can, but in
   cases where we can't for whatever reason we also make sure
   TaskInstance.handle_failure is able to operate even when `self.task` is
   None.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206534873

   > This still baffles me why TI woudl be created if we had DAG Parsing error though :).
   
   In this case it was caused by a transient error due to a secrets backend, so it was fine at the scheduler/parser, but by the time it got to the worker it ended up with a "Variable not found error". And the error was still around when the callback request was handled (and previously was ignored by the processor as the dagbag was empty for that one file)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206539038

   And yeah. In this cases "queued_by" is not good hypothesis.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206645524

   K, looked at coverage, yes tests are good, but I did remove one change (`task = BaseOperator`) as it's not needed due to later changes I made.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206506914

   Yeah. as far as I looked, this "failure_handling" have not changed for quite a while so I'd say we had the problem but we were not aware of it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25550:
URL: https://github.com/apache/airflow/pull/25550#discussion_r938924935


##########
tests/dag_processing/test_processor.py:
##########
@@ -388,10 +389,71 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
                 full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
             )
         ]
-        dag_file_processor.execute_callbacks(dagbag, requests)
+        dag_file_processor.execute_callbacks(dagbag, requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @pytest.mark.parametrize(
+        ["has_serialized_dag"],
+        [pytest.param(True, id="dag_in_db"), pytest.param(False, id="no_dag_found")],
+    )
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, has_serialized_dag):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            dagrun = dag.create_dagrun(
+                state=State.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                session=session,
+            )
+            task = dag.get_task(task_id='run_this_first')
+            ti = TaskInstance(task, run_id=dagrun.run_id, state=State.QUEUED)
+            session.add(ti)
+
+            if has_serialized_dag:
+                assert SerializedDagModel.write_dag(dag, session=session) is True
+                session.flush()
+
+        requests = [
+            TaskCallbackRequest(
+                full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
+            )
+        ]
+        dag_file_processor.execute_callbacks_without_dag(requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_dag_parse_error(self, mock_ti_handle_failure):

Review Comment:
   (I believe)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206465040

   This isn't a problem with zombie detection, but how the zombie is killed.
   
   
   (It also caused a problem when trying to run an on_failure/on_retry callback on 2.3, but main has changed that a bit due to AIP-45)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206527786

   I found this bug whilst digging in to a report stalled tasks in a customer's deployment (running on 2.3.x).
   
   It _may_ also have been a problem for them on 2.2.5 as well (unsure, didn't dig into old version at all, but it's been a persistent problem this team)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206537873

   > In this case it was caused by a transient error due to a secrets backend, so it was fine at the scheduler/parser, but by the time it got to the worker it ended up with a "Variable not found error". And the error was still around when the callback request was handled (and previously was ignored by the processor as the dagbag was empty for that one file)
   
   Distributed systems FTW


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206479652

   It's not really important where/when the change happened to be honest, I just cc'd you two as you touched the callback code most recently (I think) so hopefully have the context to review this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206527331

   Yeah @ashb @jedcunningham - the more I look at it, the more I see that the problem was simply revealed by #24906.
   
   From what I understand - the TIs which had never gone to this part of scheduler loop (eligible for execution) will have "queued_by_job_id=Null". 
   
   I believe if a TI was created and DAG failed parsing - non of the TI belonging to the DAG  would ever get here.
   This still baffles me why TI woudl be created if we had DAG Parsing error though :). 
   
   ```
     session.query(TI).filter(filter_for_tis).update(
         # TODO[ha]: should we use func.now()? How does that work with DB timezone
         # on mysql when it's not UTC?
         {
             TI.state: TaskInstanceState.QUEUED,
             TI.queued_dttm: timezone.utcnow(),
             TI.queued_by_job_id: self.id,
         },
         synchronize_session=False,
   ``
   
   As of #24906 such TIs are not selected during the _find_zombies(). 
   
   If that is correct - are there any other circumstances we can have TI created and not set to QUEUED state?  Shoudl we rethink #24906 (and add or NULL)  or I am missing something :)? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb merged PR #25550:
URL: https://github.com/apache/airflow/pull/25550


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206456959

   Prior to that change zombies were killed too aggressively by multiple schedulers at the same time - and I tihnk https://github.com/apache/airflow/pull/24906 filtered too much 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206541554

   The behaviour we saw on 2.3 was
   
   - the TI was in queued state
   - error happens now making parse of file fail
   - the error happened v early in celery worker (as it tried to parse the file in the LocalTask) which causes Executor to send a fail event to scheduler
   - scheduler processes that event and sends fail request via Executor to processor.
   - callback is ignored and TI state left in queued.
   
   On main the TI was detected as zombie in this case fine, but the callback never ran so it didn't get the state set back 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25550:
URL: https://github.com/apache/airflow/pull/25550#discussion_r938921182


##########
tests/dag_processing/test_processor.py:
##########
@@ -388,10 +389,71 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
                 full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
             )
         ]
-        dag_file_processor.execute_callbacks(dagbag, requests)
+        dag_file_processor.execute_callbacks(dagbag, requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @pytest.mark.parametrize(
+        ["has_serialized_dag"],
+        [pytest.param(True, id="dag_in_db"), pytest.param(False, id="no_dag_found")],
+    )
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, has_serialized_dag):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            dagrun = dag.create_dagrun(
+                state=State.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                session=session,
+            )
+            task = dag.get_task(task_id='run_this_first')
+            ti = TaskInstance(task, run_id=dagrun.run_id, state=State.QUEUED)
+            session.add(ti)
+
+            if has_serialized_dag:
+                assert SerializedDagModel.write_dag(dag, session=session) is True
+                session.flush()
+
+        requests = [
+            TaskCallbackRequest(
+                full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
+            )
+        ]
+        dag_file_processor.execute_callbacks_without_dag(requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_dag_parse_error(self, mock_ti_handle_failure):

Review Comment:
   Oh I think this was a c&p of a previous test that I changed name of



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25550:
URL: https://github.com/apache/airflow/pull/25550#discussion_r938924695


##########
tests/dag_processing/test_processor.py:
##########
@@ -388,10 +389,71 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
                 full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
             )
         ]
-        dag_file_processor.execute_callbacks(dagbag, requests)
+        dag_file_processor.execute_callbacks(dagbag, requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @pytest.mark.parametrize(
+        ["has_serialized_dag"],
+        [pytest.param(True, id="dag_in_db"), pytest.param(False, id="no_dag_found")],
+    )
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, has_serialized_dag):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            dagrun = dag.create_dagrun(
+                state=State.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                session=session,
+            )
+            task = dag.get_task(task_id='run_this_first')
+            ti = TaskInstance(task, run_id=dagrun.run_id, state=State.QUEUED)
+            session.add(ti)
+
+            if has_serialized_dag:
+                assert SerializedDagModel.write_dag(dag, session=session) is True
+                session.flush()
+
+        requests = [
+            TaskCallbackRequest(
+                full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
+            )
+        ]
+        dag_file_processor.execute_callbacks_without_dag(requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_dag_parse_error(self, mock_ti_handle_failure):

Review Comment:
   ```
   ti.task = None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25550:
URL: https://github.com/apache/airflow/pull/25550#discussion_r938892509


##########
airflow/dag_processing/processor.py:
##########
@@ -637,7 +640,27 @@ def execute_callbacks(
                     request.full_filepath,
                 )
 
-        session.commit()
+        session.flush()
+
+    def execute_callbacks_without_dag(
+        self, callback_requests: List[CallbackRequest], session: Session
+    ) -> None:
+        """
+        Execute what callbacks we can as "best effort" when the dag cannot be found/had parse errors.
+
+        This is so important so that tasks that failed when there is a parse

Review Comment:
   ```suggestion
           This is important so that tasks that failed when there is a parse
   ```



##########
tests/dag_processing/test_processor.py:
##########
@@ -388,10 +389,71 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
                 full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
             )
         ]
-        dag_file_processor.execute_callbacks(dagbag, requests)
+        dag_file_processor.execute_callbacks(dagbag, requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @pytest.mark.parametrize(
+        ["has_serialized_dag"],
+        [pytest.param(True, id="dag_in_db"), pytest.param(False, id="no_dag_found")],
+    )
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, has_serialized_dag):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            dagrun = dag.create_dagrun(
+                state=State.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                session=session,
+            )
+            task = dag.get_task(task_id='run_this_first')
+            ti = TaskInstance(task, run_id=dagrun.run_id, state=State.QUEUED)
+            session.add(ti)
+
+            if has_serialized_dag:
+                assert SerializedDagModel.write_dag(dag, session=session) is True
+                session.flush()
+
+        requests = [
+            TaskCallbackRequest(
+                full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
+            )
+        ]
+        dag_file_processor.execute_callbacks_without_dag(requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_dag_parse_error(self, mock_ti_handle_failure):

Review Comment:
   How does this test simulate a parse error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206585498

   Wait on merging this please.
   
   I want to recheck the test cases (busy right now on other work)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206486921

   > It's not really important where/when the change happened to be honest, I just cc'd you two as you touched the callback code most recently (I think) so hopefully have the context to review this PR.
   
   Yeah. Not a question of blame - but I think it is important to know what has been affected and when :). - that's why I am looking for when it could happen. Did you see/got reports about those tasks queued/zombies in the wild? Because I think that simply #24906 might simply revealed the problem that we had for a long time (this is my current hypothesis). 
   
   Maybe previously those zombies were simply killed by _find_zombies() and were not not noticed - seemse regardless if they failed during Parsing?
   
   The condition:
   
   ```
   .filter(TaskInstance.queued_by_job_id == self.id)
   ```
   
   I think would only happen if the job was actually queued by the scheduler? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206463816

   (At least when it came to zombie removal that is). But I am reviewing now your change to see the scope in detail


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206455238

   HIghly doubt it. I think this one caused it https://github.com/apache/airflow/pull/24906


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25550:
URL: https://github.com/apache/airflow/pull/25550#discussion_r938922969


##########
tests/dag_processing/test_processor.py:
##########
@@ -388,10 +389,71 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
                 full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
             )
         ]
-        dag_file_processor.execute_callbacks(dagbag, requests)
+        dag_file_processor.execute_callbacks(dagbag, requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @pytest.mark.parametrize(
+        ["has_serialized_dag"],
+        [pytest.param(True, id="dag_in_db"), pytest.param(False, id="no_dag_found")],
+    )
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure, has_serialized_dag):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            dagrun = dag.create_dagrun(
+                state=State.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                session=session,
+            )
+            task = dag.get_task(task_id='run_this_first')
+            ti = TaskInstance(task, run_id=dagrun.run_id, state=State.QUEUED)
+            session.add(ti)
+
+            if has_serialized_dag:
+                assert SerializedDagModel.write_dag(dag, session=session) is True
+                session.flush()
+
+        requests = [
+            TaskCallbackRequest(
+                full_filepath="A", simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
+            )
+        ]
+        dag_file_processor.execute_callbacks_without_dag(requests, session)
+        mock_ti_handle_failure.assert_called_once_with(
+            error="Message", test_mode=conf.getboolean('core', 'unit_test_mode'), session=session
+        )
+
+    @patch.object(TaskInstance, 'handle_failure')
+    def test_execute_on_failure_callbacks_dag_parse_error(self, mock_ti_handle_failure):

Review Comment:
   Removing this (it's better handled the the parameterized test I added)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25550: Ensure that zombie tasks for dags with errors get cleaned up

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25550:
URL: https://github.com/apache/airflow/pull/25550#issuecomment-1206384855

   /cc @potiuk @mhenc  This _may_ have been as a result of separating out the DagParser command in 2.3 ? Or it's possible this has been an edge case for a while.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org