You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/11/03 21:30:15 UTC

[airflow] 16/17: Only mark SchedulerJobs as failed, not any jobs (#19375)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9b014673f379b910c22ca4902523a49c227115af
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Wed Nov 3 00:45:41 2021 -0600

    Only mark SchedulerJobs as failed, not any jobs (#19375)
    
    In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have
    failed `scheduler_health_check_threshold` to failed, however a missing
    condition was allowing that timeout to apply to all jobs, not just SchedulerJobs.
    This is because polymorphic identity isn't included for `update()`:
    https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update
    
    So if we had any running LocalTaskJobs that, for whatever reason, aren't
    heartbeating faster than `scheduler_health_check_threshold`, their state
    gets set to failed and they subsequently exit with a log line similar to:
    
        State of this instance has been externally set to scheduled. Terminating instance.
    
    Note that the state it is set to can be different (e.g. queued or
    up_for_retry) simply depending on how quickly the scheduler has
    progressed that task_instance again.
    
    (cherry picked from commit 38d329bd112e8be891f077b4e3300182930cf74d)
---
 airflow/jobs/scheduler_job.py    |  1 +
 tests/jobs/test_scheduler_job.py | 31 +++++++++++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2c7378d..2a230a7 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1127,6 +1127,7 @@ class SchedulerJob(BaseJob):
                     num_failed = (
                         session.query(SchedulerJob)
                         .filter(
+                            SchedulerJob.job_type == "SchedulerJob",
                             SchedulerJob.state == State.RUNNING,
                             SchedulerJob.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)),
                         )
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 4922617..5380960 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -37,6 +37,7 @@ from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow.jobs.backfill_job import BackfillJob
+from airflow.jobs.base_job import BaseJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
 from airflow.models.dagrun import DagRun
@@ -2445,6 +2446,36 @@ class TestSchedulerJob:
         if old_job.processor_agent:
             old_job.processor_agent.end()
 
+    def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog):
+        """Make sure we only set SchedulerJobs to failed, not all jobs"""
+        session = settings.Session()
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.state = State.RUNNING
+        self.scheduler_job.latest_heartbeat = timezone.utcnow()
+        session.add(self.scheduler_job)
+        session.flush()
+
+        old_job = SchedulerJob(subdir=os.devnull)
+        old_job.state = State.RUNNING
+        old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
+        session.add(old_job)
+        session.flush()
+
+        old_task_job = BaseJob()  # Imagine it's a LocalTaskJob, but this is easier to provision
+        old_task_job.state = State.RUNNING
+        old_task_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
+        session.add(old_task_job)
+        session.flush()
+
+        with caplog.at_level('INFO', logger='airflow.jobs.scheduler_job'):
+            self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
+        session.expire_all()
+
+        assert old_job.state == State.FAILED
+        assert old_task_job.state == State.RUNNING
+        assert 'Marked 1 SchedulerJob instances as failed' in caplog.messages
+
     def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker):
         """Test SLA Callbacks are not sent when check_slas is False"""
         dag_id = 'test_send_sla_callbacks_to_processor_sla_disabled'