You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/19 12:52:45 UTC

[GitHub] [airflow] ephraimbuddy edited a comment on issue #14422: on_failure_callback does not seem to fire on pod deletion/eviction

ephraimbuddy edited a comment on issue #14422:
URL: https://github.com/apache/airflow/issues/14422#issuecomment-822437153


   Hi @houqp , I tried your suggestion with that of @kaxil but they didn't work.
   
   This is another solution that worked:
   <details><summary>Changes diff</summary>
   <p>
   
   ```diff
   diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
   index d68bfc722..6d3feb72a 100644
   --- a/airflow/jobs/local_task_job.py
   +++ b/airflow/jobs/local_task_job.py
   @@ -76,7 +76,6 @@ class LocalTaskJob(BaseJob):
                """Setting kill signal handler"""
                self.log.error("Received SIGTERM. Terminating subprocesses")
                self.on_kill()
   -            raise AirflowException("LocalTaskJob received SIGTERM signal")
    
            # pylint: enable=unused-argument
            signal.signal(signal.SIGTERM, signal_handler)
   @@ -191,9 +190,16 @@ class LocalTaskJob(BaseJob):
                if ti.state == State.SUCCESS:
                    error = None
                else:
   +                if ti.state == State.RUNNING:
   +                    ti.state = State.FAILED
   +                    session.merge(ti)
   +                    session.commit()
                    # if ti.state is not set by taskinstance.handle_failure, then
                    # error file will not be populated and it must be updated by
                    # external source suck as web UI
   -                error = self.task_runner.deserialize_run_error() or "task marked as failed externally"
   +                try:
   +                    error = self.task_runner.deserialize_run_error() or "task marked as failed externally"
   +                except ValueError:
   +                    error = 'task killed by signal'
                ti._run_finished_callback(error=error)  # pylint: disable=protected-access
                self.terminating = True
   diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
   index 7f9445c62..56c5d6b8b 100644
   --- a/tests/jobs/test_local_task_job.py
   +++ b/tests/jobs/test_local_task_job.py
   @@ -29,6 +29,7 @@ import pytest
    
    from airflow import settings
    from airflow.exceptions import AirflowException, AirflowFailException
   +from airflow.executors.kubernetes_executor import KubernetesExecutor
    from airflow.executors.sequential_executor import SequentialExecutor
    from airflow.jobs.local_task_job import LocalTaskJob
    from airflow.models.dag import DAG
   @@ -495,6 +496,70 @@ class TestLocalTaskJob(unittest.TestCase):
            assert task_terminated_externally.value == 1
            assert not process.is_alive()
    
   +    def test_process_kill_call_on_failure_callback(self):
   +        """
   +        Test that ensures that where a task is killed with sigterm
   +        on_failure_callback gets executed
   +        """
   +        # use shared memory value so we can properly track value change even if
   +        # it's been updated across processes.
   +        failure_callback_called = Value('i', 0)
   +        task_terminated_externally = Value('i', 1)
   +        shared_mem_lock = Lock()
   +
   +        def failure_callback(context):
   +            with shared_mem_lock:
   +                failure_callback_called.value += 1
   +            assert context['dag_run'].dag_id == 'test_mark_failure'
   +
   +        dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
   +
   +        def task_function(ti):
   +            # pylint: disable=unused-argument
   +            time.sleep(60)
   +            # This should not happen -- the state change should be noticed and the task should get killed
   +            with shared_mem_lock:
   +                task_terminated_externally.value = 0
   +            raise Exception
   +
   +        task = PythonOperator(
   +            task_id='test_on_failure',
   +            python_callable=task_function,
   +            on_failure_callback=failure_callback,
   +            dag=dag,
   +        )
   +
   +        session = settings.Session()
   +
   +        dag.clear()
   +        dag.create_dagrun(
   +            run_id="test",
   +            state=State.RUNNING,
   +            execution_date=DEFAULT_DATE,
   +            start_date=DEFAULT_DATE,
   +            session=session,
   +        )
   +        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
   +        ti.refresh_from_db()
   +        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=KubernetesExecutor())
   +        job1.task_runner = StandardTaskRunner(job1)
   +
   +        settings.engine.dispose()
   +        process = multiprocessing.Process(target=job1.run)
   +        process.start()
   +
   +        for _ in range(0, 10):
   +            ti.refresh_from_db()
   +            if ti.state == State.RUNNING:
   +                break
   +            time.sleep(0.2)
   +        assert ti.state == State.RUNNING
   +        os.kill(ti.pid, 15)
   +        process.join(timeout=10)
   +        assert failure_callback_called.value == 1
   +        assert task_terminated_externally.value == 1
   +        assert not process.is_alive()
   +
    
    @pytest.fixture()
    def clean_db_helper():
   
   ```
   </p>
   </details>
   Also, I tested the above solution as well as my PR(#15172) on #11086 and both worked fine.
   
   Here's #11086 modified dag to work on 2.0+:
   
   <details><summary>DAG CODE</summary>
   <p>
   
   ```python
   from datetime import datetime
   
   import numpy as np
   
   from airflow import DAG
   from airflow.api.common.experimental.trigger_dag import trigger_dag
   from airflow.operators.python import PythonOperator
   
   
   def on_failure_callback(context):
       print("===IN ON FAILURE CALLBACK===")
       print("Triggering another run of the task")
       trigger_dag("OOM_test_follower")
   
   
   def high_memory_task():
       l = []
       iteration = 0
       while True:
           print(f"Iteration: {iteration}")
           l.append(np.random.randint(1_000_000, size=(1000, 1000, 100)))
           iteration += 1
   
   
   def failure_task():
       raise ValueError("whoops")
   
   
   def print_context(ds, **kwargs):
       print("This DAG was launched by the failure callback")
       print(ds)
       print(kwargs)
   
   
   dag = DAG(
       dag_id="OOM_test",
       schedule_interval=None,
       catchup=False,
       default_args={
           "owner": "madison.bowden",
           "start_date": datetime(year=2019, month=7, day=1),
           "email": "your-email",
       },
   )
   
   with dag:
   
       PythonOperator(
           task_id="oom_task",
           python_callable=high_memory_task,
           on_failure_callback=on_failure_callback,
       )
   
   failure_dag = DAG(
       dag_id="Failure_test",
       schedule_interval=None,
       catchup=False,
       default_args={
           "owner": "madison.bowden",
           "start_date": datetime(year=2019, month=7, day=1),
           "email": "your-email",
       },
   )
   
   with failure_dag:
   
       PythonOperator(
           task_id="failure_task",
           python_callable=failure_task,
           on_failure_callback=on_failure_callback,
       )
   
   dag_follower = DAG(
       dag_id="OOM_test_follower",
       schedule_interval=None,
       catchup=False,
       default_args={
           "owner": "madison.bowden",
           "start_date": datetime(year=2019, month=7, day=1),
           "email": "your-email",
       },
   )
   
   with dag_follower:
   
       PythonOperator(
           task_id="oom_task_failure", python_callable=print_context
       )
   
   ```
   </p>
   </details>
   
   On testing, it looks like we can't capture SIGKILL : https://docs.python.org/3/library/signal.html#signal.SIGKILL. I will appreciate any guidance for this.
   
   Also, I'm preferring the solution on the PR(#15172) because it works the way `email_on_failure` works. 
   While this later solution works, I feel it's not appropriate. I will also appreciate any other solution apart from this two
   
   Let me know what you think. Thanks
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org