You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/24 23:13:44 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #17819: Handle task callback inside the scheduler

ephraimbuddy opened a new pull request #17819:
URL: https://github.com/apache/airflow/pull/17819


   Task callbacks in scheduler are sent to DAG file processor to process and are quite problematic.
   Most times, task instances that have retries are not retried up to the number required, see #16625.
   Also, task instances get stuck in up-for-retry state or in queued state which led
   to #15929.
   
   I believe this happens because the DAG file processor dies and a new one is created, which may not run the task callbacks.
   
   This PR fixes this by running the callbacks right when they fail instead of passing them to DAG file processor
   
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r712852309



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -521,14 +521,24 @@ def _process_executor_events(self, session: Session = None) -> int:
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
-                request = TaskCallbackRequest(
-                    full_filepath=ti.dag_model.fileloc,
-                    simple_task_instance=SimpleTaskInstance(ti),
-                    msg=msg % (ti, state, ti.state, info),
-                )
-                self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
-                ti.set_state(state)
-                self.processor_agent.send_callback_to_execute(request)
+                # Get task from the Serialized DAG
+                try:
+                    dag = self.dagbag.get_dag(ti.dag_id)
+                    task = dag.get_task(ti.task_id)
+                except Exception as ex:
+                    self.log.exception("Marking task instance %s as failed. Reason: %s", ti, ex)

Review comment:
       `log.exception` already includes the exception, so we don't need to include the message again.
   ```suggestion
                       self.log.exception("Marking task instance %s as %s", ti, state)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Tonkonozhenko commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
Tonkonozhenko commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-920956854


   @ephraimbuddy https://github.com/apache/airflow/issues/18011
   I applied the patch on top of 2.1.2 and tasks still stuck in queued state


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #17819:
URL: https://github.com/apache/airflow/pull/17819


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-905524522


   This is how to reproduce the error: Run this dag and assert that it's successful. Then uncomment the `depend_on_past` arg so it's  proper but `depend_on_past` instead of `depends_on_past`.  You will have import error on the UI, Run the dag and it would enter queued and retry twice before failing.
   
   ```python
   
   import time
   from datetime import datetime, timedelta
   from airflow import DAG
   
   def on_failure(ctx):
       print('hello world')
       print(ctx)
   
   default_args = {'on_failure_callback': on_failure}
   
           
   
   dag = DAG(
       dag_id='Give-wrong-arg',
       schedule_interval=None,
       catchup=False,
       start_date=datetime(2021,7,12),
       default_args=default_args,
   )
   
   @dag.task(retries=2, retry_delay=timedelta(seconds=20))#, depend_on_past=False)
   def task_wrong_arg():
       time.sleep(5)
   
   @dag.task
   def myfunc():
       return 1
   
   
   
   task_wrong_arg() >> myfunc()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r709902550



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,7 +207,7 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(

Review comment:
       Ok. But I think we may miss the logs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-920954901


   > @ephraimbuddy @ashb I tried the patch, unfortunately still not ok
   
   What was the problem @Tonkonozhenko. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r710262945



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,7 +207,7 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(

Review comment:
       The issue I face now is that to be able to check if there are still retries, we need the task object. `ti.is_eligible_to_retry` requires that `ti.task` exists




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r712855669



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -521,14 +521,24 @@ def _process_executor_events(self, session: Session = None) -> int:
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
-                request = TaskCallbackRequest(
-                    full_filepath=ti.dag_model.fileloc,
-                    simple_task_instance=SimpleTaskInstance(ti),
-                    msg=msg % (ti, state, ti.state, info),
-                )
-                self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
-                ti.set_state(state)
-                self.processor_agent.send_callback_to_execute(request)
+                # Get task from the Serialized DAG
+                try:
+                    dag = self.dagbag.get_dag(ti.dag_id)
+                    task = dag.get_task(ti.task_id)
+                except Exception as ex:
+                    self.log.exception("Marking task instance %s as failed. Reason: %s", ti, ex)
+                    ti.set_state(state)
+                    continue
+                ti.task = task
+                if task.on_retry_callback or task.on_failure_callback:

Review comment:
       @kaxil Is this right for serialized tasks? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] WattsInABox commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
WattsInABox commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-923022910


   So do we think this fix is worth it or should we go with the astronomer alternative? Specifically, we don't use the kubernetes executor, we use the Celery executor. Has anyone tried this or the astronomer patch at scale with Celery?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r710086245



##########
File path: airflow/models/taskinstance.py
##########
@@ -1719,6 +1719,8 @@ def handle_failure(
             self.state = State.FAILED
             email_for_state = task.email_on_failure
         else:
+            if self.state == State.QUEUED:
+                self.try_number += 1

Review comment:
       Need a comment explaining why we need this please




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r712850317



##########
File path: tests/models/test_taskinstance.py
##########
@@ -1683,6 +1683,21 @@ def test_handle_failure(self, create_dummy_dag, session=None):
         assert context_arg_3 and "task_instance" in context_arg_3
         mock_on_retry_3.assert_not_called()
 
+    def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+        session = settings.Session()
+        with dag_maker():
+            task = DummyOperator(task_id="mytask", retries=1)
+        dr = dag_maker.create_dagrun()
+        ti = TI(task=task, run_id=dr.run_id)
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+        assert ti.state == State.QUEUED
+        assert ti.try_number == 1
+        ti.handle_failure("test queued ti", test_mode=True)
+        assert ti.state == State.UP_FOR_RETRY
+        assert ti.try_number == 3

Review comment:
       This might be a case where accessing `ti._try_number` would be less confusing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-924334170


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-924333229


   All the reviews have been addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r712856695



##########
File path: airflow/models/taskinstance.py
##########
@@ -1719,6 +1720,11 @@ def handle_failure(
             self.state = State.FAILED
             email_for_state = task.email_on_failure
         else:
+            if self.state == State.QUEUED:
+                # We increase the try_number so as
+                # to fail the task if it fails to start
+                # after sometime
+                self.try_number += 1

Review comment:
       ```suggestion
                   self._try_number += 1
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r695882206



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,17 +207,18 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
         session.commit()
 
         executor.event_buffer[ti1.key] = State.FAILED, None
-
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,
             msg='Executor reports task instance '
             '<TaskInstance: test_process_executor_events.dummy_task 2016-01-01 00:00:00+00:00 [queued]> '
             'finished (failed) although the task says its queued. (Info: None) '
             'Was the task killed externally?',
+            task=task1,

Review comment:
       I don't know why the` mock_task_callback` `simple_task_instance` is no longer called with `mock.ANY` but with the real `SimpleTaskInstance` object, causing the test to fail. Any ideas?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-908276602


   By changing a dag file to have a parse error and then triggering callbacks you've hit a different problem too, so that isn't a good way of triggering the behaviour you are trying to test.
   
   
   (Because to run the callbacks _might_ need to run the `on_failure_callback` we need the actual loaded dag file in many cases.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r710539698



##########
File path: tests/models/test_taskinstance.py
##########
@@ -1683,6 +1683,21 @@ def test_handle_failure(self, create_dummy_dag, session=None):
         assert context_arg_3 and "task_instance" in context_arg_3
         mock_on_retry_3.assert_not_called()
 
+    def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+        session = settings.Session()
+        with dag_maker():
+            task = DummyOperator(task_id="mytask", retries=1)
+        dr = dag_maker.create_dagrun()
+        ti = TI(task=task, run_id=dr.run_id)
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+        assert ti.state == State.QUEUED
+        assert ti.try_number == 1
+        ti.handle_failure("test queued ti", test_mode=True)
+        assert ti.state == State.UP_FOR_RETRY
+        assert ti.try_number == 3

Review comment:
       Yeah - `try_number` get's bumped to `2` in https://github.com/apache/airflow/blob/550ca654cd1b6c2e97a4174c3950a17093747880/airflow/models/taskinstance.py#L1722-L1723
   
   and when `ti.try_number` is accessed here: https://github.com/apache/airflow/blob/550ca654cd1b6c2e97a4174c3950a17093747880/airflow/models/taskinstance.py#L467-L480
   
   since it is not running and in  `UP_FOR_RETRY` state, the `try_number` is set to `3`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-920997063


   > @ephraimbuddy #18011
   > I applied the patch on top of 2.1.2 and tasks still stuck in queued state
   
   This PR was updated yesterday, is it the new change that you applied? For context, If the task is already running before it failed then the issue is not related to this PR and I don't think it's an issue, it's expected because your dependency rule is all_success, I suppose?. And I think you mean stuck in up_for_retry?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy removed a comment on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy removed a comment on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925275899


   If I understand correctly, throwing this exception is not a bug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #17819:
URL: https://github.com/apache/airflow/pull/17819


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925293100


   I don't have an idea on a test case other than the one we have in the unittest for this PR. 
   
   To reproduce with a dag manually, raise AirflowException after this line:
   https://github.com/apache/airflow/blob/13a558d658c3a1f6df4b2ee5d894fee65dc103db/airflow/cli/commands/task_command.py#L279.
   That's add `raise AirflowException` after the above line and run a dag with retries


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r702052830



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -596,13 +597,26 @@ def _process_executor_events(self, session: Session = None) -> int:
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
+                try:
+                    get_dag(self.subdir, ti.dag_id)
+                except AirflowException:
+                    self.log.error(
+                        "Cannot load the dag bag to handle failure for %s"
+                        ". Setting task to FAILED without callbacks or "
+                        "retries. Do you have enough resources?",
+                        ti,
+                    )
+                    ti.set_state(State.FAILED)
+                    session.merge(ti)
+                    session.commit()
+                    continue
+

Review comment:
       Borrowed from 1.10.x




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r710539698



##########
File path: tests/models/test_taskinstance.py
##########
@@ -1683,6 +1683,21 @@ def test_handle_failure(self, create_dummy_dag, session=None):
         assert context_arg_3 and "task_instance" in context_arg_3
         mock_on_retry_3.assert_not_called()
 
+    def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+        session = settings.Session()
+        with dag_maker():
+            task = DummyOperator(task_id="mytask", retries=1)
+        dr = dag_maker.create_dagrun()
+        ti = TI(task=task, run_id=dr.run_id)
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+        assert ti.state == State.QUEUED
+        assert ti.try_number == 1
+        ti.handle_failure("test queued ti", test_mode=True)
+        assert ti.state == State.UP_FOR_RETRY
+        assert ti.try_number == 3

Review comment:
       Yeah - `try_number` get's bumped to 2 in https://github.com/apache/airflow/blob/550ca654cd1b6c2e97a4174c3950a17093747880/airflow/models/taskinstance.py#L1722-L1723
   
   and when `ti.try_number` is accessed here: https://github.com/apache/airflow/blob/550ca654cd1b6c2e97a4174c3950a17093747880/airflow/models/taskinstance.py#L467-L480
   
   since it is not running and in  `UP_FOR_RETRY` state, the `try_number` is set to `3`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r712528132



##########
File path: airflow/models/taskinstance.py
##########
@@ -1693,6 +1693,7 @@ def handle_failure(
         Stats.incr(f'operator_failures_{task.task_type}', 1, 1)
         Stats.incr('ti_failures')
         if not test_mode:
+            self.dag_run = self.get_dagrun(session=session)

Review comment:
       @ashb This had to be added otherwise it gave a `sqlalchemy.orm.exc.DetachedInstanceError` (`lazy load operation of attribute 'dag_run' cannot proceed`), specifically:
   
   ```
   E           sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7fdfb98dd5d0> is not bound to a Session; lazy load operation of attribute 'dag_run' cannot proceed (Background on this error at: http://sqlalche.me/e/13/bhk3)
   
   /usr/local/lib/python3.7/site-packages/sqlalchemy/orm/strategies.py:720: DetachedInstanceError
   ```
   
   any ideas?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -521,14 +521,26 @@ def _process_executor_events(self, session: Session = None) -> int:
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
-                request = TaskCallbackRequest(
-                    full_filepath=ti.dag_model.fileloc,
-                    simple_task_instance=SimpleTaskInstance(ti),
-                    msg=msg % (ti, state, ti.state, info),
-                )
-                self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
-                ti.set_state(state)
-                self.processor_agent.send_callback_to_execute(request)
+                # Get task from the Serialized DAG
+                try:
+                    dag = self.dagbag.get_dag(ti.dag_id)
+                    from airflow.models.baseoperator import BaseOperator
+
+                    task: BaseOperator = dag.get_task(ti.task_id)

Review comment:
       ```suggestion
                       task = dag.get_task(ti.task_id)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925272057


   Thanks for checking this @taylorfinnell. 
   Can you modify your patch and use` ti.try_number+=1` instead of `ti._try_number+=1` in this line:
   https://github.com/apache/airflow/blob/13a558d658c3a1f6df4b2ee5d894fee65dc103db/airflow/models/taskinstance.py#L1727


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925272057


   Thanks for checking this @taylorfinnell. 
   Can you modify your patch and use` ti.try_number+=1` instead of `ti._try_number+=1` in this line:
   https://github.com/apache/airflow/blob/13a558d658c3a1f6df4b2ee5d894fee65dc103db/airflow/models/taskinstance.py#L1727
   
   If I understand correctly, throwing the above exception is not a bug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r695882206



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,17 +207,18 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
         session.commit()
 
         executor.event_buffer[ti1.key] = State.FAILED, None
-
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,
             msg='Executor reports task instance '
             '<TaskInstance: test_process_executor_events.dummy_task 2016-01-01 00:00:00+00:00 [queued]> '
             'finished (failed) although the task says its queued. (Info: None) '
             'Was the task killed externally?',
+            task=task1,

Review comment:
       I don't know why the` mock_task_callback` `simple_task_instance` is no longer called with `mock.ANY` but with the real `SimpleTaskInstance` object, causing the test to fail. Any ideas?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r703413781



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -596,13 +597,24 @@ def _process_executor_events(self, session: Session = None) -> int:
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
+                try:
+                    get_dag(self.subdir, ti.dag_id)

Review comment:
       When there's an error in parsing the dag, maybe due to resource constraints, the task callback is not run even though it's sent to the processor. The task gets stuck in queued. 
   But when there's db operational error while trying to execute a task, it results in this error but the task retries.
   One other thing we can do is add a message to inform the user that if there's a resource constraint that the task can get stuck. Then remove parsing of dags




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r709653017



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,7 +207,7 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(

Review comment:
       Hmmm..  I wonder if we can just handle failure in _process_executor_events itself if there are no callbacks. If there are no callbacks we probably don't need to send it to dag parsing process 
   
   And we need a comment here, to say the state will remain in Queue here and will be set to failed in dag parsing process.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] WattsInABox commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
WattsInABox commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925320638


   Thanks for that, we might be able to push a DAG like that to our staging environment. Theoretically also, we could write an integration test for this? Is there a good document to follow for integration testing other than the random blog articles I've found?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r710088318



##########
File path: tests/models/test_taskinstance.py
##########
@@ -1683,6 +1683,21 @@ def test_handle_failure(self, create_dummy_dag, session=None):
         assert context_arg_3 and "task_instance" in context_arg_3
         mock_on_retry_3.assert_not_called()
 
+    def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+        session = settings.Session()
+        with dag_maker():
+            task = DummyOperator(task_id="mytask", retries=1)
+        dr = dag_maker.create_dagrun()
+        ti = TI(task=task, run_id=dr.run_id)
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+        assert ti.state == State.QUEUED
+        assert ti.try_number == 1
+        ti.handle_failure("test queued ti", test_mode=True)
+        assert ti.state == State.UP_FOR_RETRY
+        assert ti.try_number == 3

Review comment:
       Hmmm, this is right but my does it confusing!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r695735143



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,30 +207,32 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
         session.commit()
 
         executor.event_buffer[ti1.key] = State.FAILED, None
-
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,
             msg='Executor reports task instance '
             '<TaskInstance: test_process_executor_events.dummy_task 2016-01-01 00:00:00+00:00 [queued]> '
             'finished (failed) although the task says its queued. (Info: None) '
             'Was the task killed externally?',
+            task=task1,
         )
         self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(task_callback)
         self.scheduler_job.processor_agent.reset_mock()
 
         # ti in success state
+        ti1 = TaskInstance(task2, DEFAULT_DATE)
         ti1.state = State.SUCCESS
         session.merge(ti1)
         session.commit()
         executor.event_buffer[ti1.key] = State.SUCCESS, None
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.SUCCESS
+        assert ti1.state == State.success_states

Review comment:
       ```suggestion
           assert ti1.state == State.SUCCESS
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r698326582



##########
File path: airflow/utils/callback_requests.py
##########
@@ -59,8 +59,10 @@ def __init__(
         simple_task_instance: SimpleTaskInstance,
         is_failure_callback: Optional[bool] = True,
         msg: Optional[str] = None,
+        task: Optional[Any] = None,

Review comment:
       We can't pass the task here -- this is a "message" that goes between processes, and by doing this it will require serialization of the task, but the task here in the scheduler is Serialized and so won't have access to any of the code etc.
   
   Instead (if we need this sort of behaviour) we'll have to load the dag in the processor (which we already do) and _then_ execute it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r712859258



##########
File path: airflow/models/taskinstance.py
##########
@@ -1719,6 +1720,11 @@ def handle_failure(
             self.state = State.FAILED
             email_for_state = task.email_on_failure
         else:
+            if self.state == State.QUEUED:
+                # We increase the try_number so as
+                # to fail the task if it fails to start
+                # after sometime
+                self.try_number += 1

Review comment:
       (This might not have any affect but is more correct -- as that way we don't have to reason about "has the try_number property already added 1 to this value or not")




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] taylorfinnell commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
taylorfinnell commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925228694


   @ephraimbuddy @kaxil 
   
   We applied this patch directly on top of the 2.1.4 tag and noticed issues almost instantly. 
   
   - Tasks would queue
   - Tasks would throw an exception - the exception in this case was from our DAG code signaling we are not ready to process, so retry later. It was not an internal airflow exception
   - Task logs would indicate that state was being set to `UP_FOR_RETRY`
   - The UI would show the task still `RUNNING`. The task would have ~16 retries left to go and would sit on attempt 1
   - Clearing the task would result in `shutdown` state
   - To get the task to run again we first had to fail it, then clear it again
   
   We also tried the `main` branch up to this commit and saw very similar issues. 
   
   Here are some logs.
   
   ```
   ...
   [2021-09-22 04:47:15,214] {taskinstance.py:1463} ERROR - Task failed with exception
   ...
   raise FileNotFoundError(f"{file} does not exist")
   
   [2021-09-22 04:47:15,299] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/strategies.py:911 SAWarning: Multiple rows returned with uselist=False for lazily-loaded attribute 'DagRun.task_instances'
   [2021-09-22 04:47:15,301] {taskinstance.py:1512} INFO - Marking task as UP_FOR_RETRY. dag_id=foo_dag_split, task_id=foo_task, execution_date=20210921T043000, start_date=20210922T044714, end_date=20210922T044715
   [2021-09-22 04:47:15,302] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py:2193 SAWarning: Instance <TaskInstance at 0x7f01f5adc070> is already pending in this Session yet is being merged again; this is probably not what you want to do
   [2021-09-22 04:47:15,367] {local_task_job.py:151} INFO - Task exited with return code 1
   [2021-09-22 04:47:15,517] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/strategies.py:911 SAWarning: Multiple rows returned with uselist=False for lazily-loaded attribute 'DagRun.task_instances'
   [2021-09-22 04:47:15,517] {taskinstance.py:1512} INFO - Marking task as UP_FOR_RETRY. dag_id=foo_dag_split, task_id=foo_task, execution_date=20210921T043000, start_date=20210922T044714, end_date=20210922T044715
   [2021-09-22 04:47:15,518] {logging_mixin.py:109} WARNING - /opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py:2193 SAWarning: Instance <TaskInstance at 0x7f01f5ad6580> is already pending in this Session yet is being merged again; this is probably not what you want to do
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925293100


   I don't have an idea on a test case other than the one we have in the unittest for this PR. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925323137


   There’s currently no community doc about integration testing that I’m aware of. 
   
   Let us know what happens when you apply the patch above for try_number


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r696076214



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,17 +207,18 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
         session.commit()
 
         executor.event_buffer[ti1.key] = State.FAILED, None
-
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,
             msg='Executor reports task instance '
             '<TaskInstance: test_process_executor_events.dummy_task 2016-01-01 00:00:00+00:00 [queued]> '
             'finished (failed) although the task says its queued. (Info: None) '
             'Was the task killed externally?',
+            task=task1,

Review comment:
       I'm not sure why this is failing...will appreciate a help here

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,17 +207,18 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
         session.commit()
 
         executor.event_buffer[ti1.key] = State.FAILED, None
-
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,
             msg='Executor reports task instance '
             '<TaskInstance: test_process_executor_events.dummy_task 2016-01-01 00:00:00+00:00 [queued]> '
             'finished (failed) although the task says its queued. (Info: None) '
             'Was the task killed externally?',
+            task=task1,

Review comment:
       I'm not sure why this is failing...will appreciate help here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-924351634


   Well done @ephraimbuddy 👏 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r710087197



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -207,7 +207,7 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.FAILED
+        assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(

Review comment:
       If there are no callbacks what logs did you expect?
   
   Oh, and sending it to the dag file parser process still won't produce any logs that in the "right" location to get picked up by the webserver.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-921027537


   @Tonkonozhenko Let's discuss at #18011 so I can use the task IDs you reported to ask questions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] taylorfinnell commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
taylorfinnell commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925278303


   That's correct we expect that exception to be raised when data is not in place for the DAG to process. We then rely on the retry mechanism to try to process the data at a later time when it is available.
   
   Do you have any suggestions on how we could reproduce this in a test case? If we can get a test that is fixed by your suggestion I would feel more comfortable trying the change. Unfortunately, we didn't see the issue until we got to production scale  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-922160488


   WDYT about https://github.com/astronomer/airflow/pull/17819/commits/136015c027d91c82179a67feb2e6b893077e5245 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r712856204



##########
File path: airflow/models/taskinstance.py
##########
@@ -1693,6 +1693,7 @@ def handle_failure(
         Stats.incr(f'operator_failures_{task.task_type}', 1, 1)
         Stats.incr('ti_failures')
         if not test_mode:
+            self.dag_run = self.get_dagrun(session=session)

Review comment:
       Not sure why this only cam up now, but this change looks okay




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-924333229


   Addressed all the reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-926821807


   @WattsInABox , I have tested this in deployment and it works as expected. I think you should create an issue for the behaviour you are seeing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r710177997



##########
File path: tests/models/test_taskinstance.py
##########
@@ -1683,6 +1683,21 @@ def test_handle_failure(self, create_dummy_dag, session=None):
         assert context_arg_3 and "task_instance" in context_arg_3
         mock_on_retry_3.assert_not_called()
 
+    def test_handle_failure_updates_queued_task_try_number(self, dag_maker):
+        session = settings.Session()
+        with dag_maker():
+            task = DummyOperator(task_id="mytask", retries=1)
+        dr = dag_maker.create_dagrun()
+        ti = TI(task=task, run_id=dr.run_id)
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+        assert ti.state == State.QUEUED
+        assert ti.try_number == 1
+        ti.handle_failure("test queued ti", test_mode=True)
+        assert ti.state == State.UP_FOR_RETRY
+        assert ti.try_number == 3

Review comment:
       Quiet confusing. I have put breakpoints to see how it's happening but no result. If I remove the increment of the try_numer, the try_number remains 1 after running the handle_failure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Handle task callback inside the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-905047147


   > We don't want to run any user code in Scheduler. That is why callbacks are currently run in the DAG file processor. Long term they should be run in the Worker
   
   Alright, let me think of another way...Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #17819: Handle task callback inside the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #17819:
URL: https://github.com/apache/airflow/pull/17819


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r703064312



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -596,13 +597,24 @@ def _process_executor_events(self, session: Session = None) -> int:
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
+                try:
+                    get_dag(self.subdir, ti.dag_id)

Review comment:
       It's not clear to me why we even need this? What's going on here?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -49,6 +49,7 @@
 from airflow.ti_deps.dependencies_states import EXECUTION_STATES
 from airflow.utils import timezone
 from airflow.utils.callback_requests import DagCallbackRequest, TaskCallbackRequest
+from airflow.utils.cli import get_dag

Review comment:
       This function loads the actual DAG from the file on disk, which we can't do in the scheduler
   
   Can't because Airflow shouldn't ever load DAGs in to any long running process which includes the scheduler. This is bad because it could mean a badly written dag file could bring down the scheduler (and right now the scheduler is isolated from this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Tonkonozhenko commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
Tonkonozhenko commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-920893616


   @ephraimbuddy @ashb I tried the patch, unfortunately still not ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Tonkonozhenko commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
Tonkonozhenko commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-921006214


   @ephraimbuddy yes, I tried the updated version.
   both 2.1.2 and 2.1.3 work bad. in 2.1.3 tasks are stuck in up_for_retry, in 2.1.2 are stuck in queued.
   
   Now we have a k8s related issue so task doesn't finish correctly and doesn't send logs to s3. And stuck in queued after this.
   
   ![image](https://user-images.githubusercontent.com/1307646/133640373-45799aba-48cc-4192-836e-d2ef423fadd9.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Promptly handle task callback from _process_executor_events

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-905475151


   Hi @kaxil, what do you think about this implementation? still working on tests...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#discussion_r713361445



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -521,14 +521,24 @@ def _process_executor_events(self, session: Session = None) -> int:
                 )
                 self.log.error(msg, ti, state, ti.state, info)
 
-                request = TaskCallbackRequest(
-                    full_filepath=ti.dag_model.fileloc,
-                    simple_task_instance=SimpleTaskInstance(ti),
-                    msg=msg % (ti, state, ti.state, info),
-                )
-                self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
-                ti.set_state(state)
-                self.processor_agent.send_callback_to_execute(request)
+                # Get task from the Serialized DAG
+                try:
+                    dag = self.dagbag.get_dag(ti.dag_id)
+                    task = dag.get_task(ti.task_id)
+                except Exception as ex:
+                    self.log.exception("Marking task instance %s as failed. Reason: %s", ti, ex)
+                    ti.set_state(state)
+                    continue
+                ti.task = task
+                if task.on_retry_callback or task.on_failure_callback:

Review comment:
       Yeah, we currently store it as string:
   
   ```python
   (Pdb) pp SerializedDAG.serialize_dag(dag)
   {'_dag_id': 'test_process_executor_events_with_callback',
    '_task_group': {'_group_id': None,
                    'children': {'dummy_task': (<DagAttributeTypes.OP: 'operator'>,
                                                'dummy_task')},
                    'downstream_group_ids': [],
                    'downstream_task_ids': [],
                    'prefix_group_id': True,
                    'tooltip': '',
                    'ui_color': 'CornflowerBlue',
                    'ui_fgcolor': '#000',
                    'upstream_group_ids': [],
                    'upstream_task_ids': []},
    'dag_dependencies': [],
    'edge_info': {},
    'fileloc': '/test_path1/',
    'params': {},
    'schedule_interval': {<Encoding.TYPE: '__type'>: <DagAttributeTypes.TIMEDELTA: 'timedelta'>,
                          <Encoding.VAR: '__var'>: 86400.0},
    'start_date': 1451606400.0,
    'tasks': [{'_downstream_task_ids': [],
               '_inlets': [],
               '_is_dummy': True,
               '_outlets': [],
               '_task_module': 'airflow.serialization.serialized_objects',
               '_task_type': 'SerializedBaseOperator',
               'label': 'dummy_task',
               'on_failure_callback': '            task1 = '
                                      'DummyOperator(task_id=task_id_1, '
                                      'on_failure_callback=lambda x: '
                                      'print("hi"))\n',
               'pool': 'default_pool',
               'task_id': 'dummy_task',
               'template_ext': [],
               'template_fields': [],
               'template_fields_renderers': {},
               'ui_color': '#e8f7e4',
               'ui_fgcolor': '#000'}],
    'timezone': 'UTC'}
   ```
   
   or 
   
   ```python
   (Pdb) from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
   (Pdb) pp SerializedDAG.serialize_dag(dag)
   {'_dag_id': 'test_process_executor_events_with_callback',
    '_task_group': {'_group_id': None,
                    'children': {'dummy_task': (<DagAttributeTypes.OP: 'operator'>,
                                                'dummy_task')},
                    'downstream_group_ids': [],
                    'downstream_task_ids': [],
                    'prefix_group_id': True,
                    'tooltip': '',
                    'ui_color': 'CornflowerBlue',
                    'ui_fgcolor': '#000',
                    'upstream_group_ids': [],
                    'upstream_task_ids': []},
    'dag_dependencies': [],
    'edge_info': {},
    'fileloc': '/test_path1/',
    'params': {},
    'schedule_interval': {<Encoding.TYPE: '__type'>: <DagAttributeTypes.TIMEDELTA: 'timedelta'>,
                          <Encoding.VAR: '__var'>: 86400.0},
    'start_date': 1451606400.0,
    'tasks': [{'_downstream_task_ids': [],
               '_inlets': [],
               '_is_dummy': True,
               '_outlets': [],
               '_task_module': 'airflow.serialization.serialized_objects',
               '_task_type': 'SerializedBaseOperator',
               'label': 'dummy_task',
               'on_retry_callback': '            task1 = '
                                    'DummyOperator(task_id=task_id_1, '
                                    'on_retry_callback=lambda x: print("hi"))\n',
               'pool': 'default_pool',
               'task_id': 'dummy_task',
               'template_ext': [],
               'template_fields': [],
               'template_fields_renderers': {},
               'ui_color': '#e8f7e4',
               'ui_fgcolor': '#000'}],
    'timezone': 'UTC'}
   (Pdb)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #17819: Properly handle ti state difference between executor and scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #17819:
URL: https://github.com/apache/airflow/pull/17819#issuecomment-925275899


   If I understand correctly, throwing this exception is not a bug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org