You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/03 08:02:42 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #15172: Execute on failure callback when a pod is marked for deletion

ephraimbuddy opened a new pull request #15172:
URL: https://github.com/apache/airflow/pull/15172


   Closes: https://github.com/apache/airflow/issues/14422
   
   Currently, on_failure_callback is only called when a task finishes
   executing not while executing. When a pod is deleted, a SIGTERM is
   sent to the task and the task is stopped immediately. The task is
   still running when it was killed and therefore on_failure_callback
   is not called.
   
   This PR makes sure that when a pod is marked for deletion and the
   task is killed, if the task has on_failure_callback, the callback
   is called
   
   Have struggled on testing this on code, will appreciate any help
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-816057934


   > I don't think this is a correct fix check my comments in [#14422 (comment)](https://github.com/apache/airflow/issues/14422#issuecomment-815368180)
   
   I have updated it with a test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#discussion_r614495815



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +496,70 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that where a task is killed with sigterm
+        on_failure_callback gets executed
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        failure_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def failure_callback(context):
+            with shared_mem_lock:
+                failure_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure'
+
+        dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+            raise Exception
+
+        task = PythonOperator(
+            task_id='test_on_failure',
+            python_callable=task_function,
+            on_failure_callback=failure_callback,
+            dag=dag,
+        )
+
+        session = settings.Session()
+
+        dag.clear()
+        dag.create_dagrun(
+            run_id="test",
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=KubernetesExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+
+        settings.engine.dispose()
+        process = multiprocessing.Process(target=job1.run)
+        process.start()
+
+        for _ in range(0, 10):
+            ti.refresh_from_db()
+            if ti.state == State.RUNNING:
+                break
+            time.sleep(0.2)
+        assert ti.state == State.RUNNING
+        os.kill(ti.pid, 15)

Review comment:
       good unit test, thanks for adding it 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15172: Execute `on_failure_callback` when SIGTERM is received

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-823524562


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #15172:
URL: https://github.com/apache/airflow/pull/15172


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#discussion_r616926404



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +496,69 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that where a task is killed with sigterm
+        on_failure_callback gets executed
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        failure_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def failure_callback(context):
+            with shared_mem_lock:
+                failure_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure'
+
+        dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        task = PythonOperator(
+            task_id='test_on_failure',
+            python_callable=task_function,
+            on_failure_callback=failure_callback,
+            dag=dag,
+        )
+
+        session = settings.Session()
+
+        dag.clear()
+        dag.create_dagrun(
+            run_id="test",
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=KubernetesExecutor())

Review comment:
       Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15172: Execute `on_failure_callback` when SIGTERM is received

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-825979531


   Failing Helm chart test is already fixed in Master by https://github.com/apache/airflow/commit/17c38be872ed1d5b16f61a9e433e4b907b396c8a


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15172: Execute `on_failure_callback` when SIGTERM is received

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-823819235


   [The Workflow run](https://github.com/apache/airflow/actions/runs/769708031) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-823130598


   [The Workflow run](https://github.com/apache/airflow/actions/runs/766623253) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-813100721


   [The Workflow run](https://github.com/apache/airflow/actions/runs/717368051) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-823130327


   [The Workflow run](https://github.com/apache/airflow/actions/runs/766601101) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-820842583


   @ephraimbuddy to simulate the scenario in #11086, we could send a sigkill (9) instead of sigterm (15) in the unit test to force kill the task_runner subprocess.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-819525688


   @houqp Did you get a chance to take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-817081964


   > Sent you an invite to talk about this on Monday
   
   Accepted. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#discussion_r616877936



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +496,69 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that where a task is killed with sigterm
+        on_failure_callback gets executed
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        failure_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def failure_callback(context):
+            with shared_mem_lock:
+                failure_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure'
+
+        dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        task = PythonOperator(
+            task_id='test_on_failure',
+            python_callable=task_function,
+            on_failure_callback=failure_callback,
+            dag=dag,
+        )
+
+        session = settings.Session()
+
+        dag.clear()
+        dag.create_dagrun(
+            run_id="test",
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=KubernetesExecutor())

Review comment:
       BTW, i think using `SequentialExecutor` here should be enough to reproduce the failure :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-820841006


   @kaxil @ephraimbuddy sorry I was in vacation. I commented my analysis and recommended fix in https://github.com/apache/airflow/issues/14422#issuecomment-820839115. @ephraimbuddy could you give that a try?
   
   This fix will not cause race condition as far as I can tell, because it is still only executing the callback in a one process (success in local task job and failure in run_raw_task). but it will cause a regression for https://github.com/apache/airflow/issues/11086.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #15172: Execute `on_failure_callback` when SIGTERM is received

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#discussion_r617863627



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +496,69 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that when a task is killed with sigterm

Review comment:
       nitpick
   
   ```suggestion
           Test that ensures that when the local_task_job is killed with sigterm
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15172: Execute `on_failure_callback` when SIGTERM is received

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-823536687


   [The Workflow run](https://github.com/apache/airflow/actions/runs/768184459) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-817958022


   @houqp Can you take a look at this please since you worked on #10917


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #15172: Execute `on_failure_callback` when SIGTERM is received

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #15172:
URL: https://github.com/apache/airflow/pull/15172


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-816277666


   [The Workflow run](https://github.com/apache/airflow/actions/runs/731160970) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15172: Execute on failure callback when a sigterm is received

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#discussion_r616938204



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +495,69 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that where a task is killed with sigterm
+        on_failure_callback gets executed

Review comment:
       ```suggestion
           Test that ensures that when a task is killed with sigterm
           on_failure_callback gets executed
   ```

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -495,6 +495,69 @@ def task_function(ti):
         assert task_terminated_externally.value == 1
         assert not process.is_alive()
 
+    def test_process_kill_call_on_failure_callback(self):
+        """
+        Test that ensures that where a task is killed with sigterm
+        on_failure_callback gets executed
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        failure_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def failure_callback(context):
+            with shared_mem_lock:
+                failure_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure'
+
+        dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        def task_function(ti):
+            # pylint: disable=unused-argument
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        task = PythonOperator(
+            task_id='test_on_failure',
+            python_callable=task_function,
+            on_failure_callback=failure_callback,
+            dag=dag,
+        )
+
+        session = settings.Session()
+
+        dag.clear()
+        dag.create_dagrun(
+            run_id="test",
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+
+        settings.engine.dispose()
+        process = multiprocessing.Process(target=job1.run)
+        process.start()
+
+        for _ in range(0, 10):
+            ti.refresh_from_db()
+            if ti.state == State.RUNNING:
+                break
+            time.sleep(0.2)
+        assert ti.state == State.RUNNING
+        os.kill(ti.pid, 15)

Review comment:
       ```suggestion
           os.kill(ti.pid, signal.SIGTERM)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #15172: Execute on failure callback when a pod is marked for deletion

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #15172:
URL: https://github.com/apache/airflow/pull/15172#issuecomment-817958022


   @houqp Can you take a look at this please since you worked on #10917 coz this can _probably_ lead to the same race condition as the one you fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #15172: Execute `on_failure_callback` when SIGTERM is received

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #15172:
URL: https://github.com/apache/airflow/pull/15172


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org