You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/11/03 21:30:15 UTC
[airflow] 16/17: Only mark SchedulerJobs as failed,
not any jobs (#19375)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9b014673f379b910c22ca4902523a49c227115af
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Wed Nov 3 00:45:41 2021 -0600
Only mark SchedulerJobs as failed, not any jobs (#19375)
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have
failed `scheduler_health_check_threshold` to failed, however a missing
condition was allowing that timeout to apply to all jobs, not just SchedulerJobs.
This is because polymorphic identity isn't included for `update()`:
https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update
So if we had any running LocalTaskJobs that, for whatever reason, aren't
heartbeating faster than `scheduler_health_check_threshold`, their state
gets set to failed and they subsequently exit with a log line similar to:
State of this instance has been externally set to scheduled. Terminating instance.
Note that the state it is set to can be different (e.g. queued or
up_for_retry) simply depending on how quickly the scheduler has
progressed that task_instance again.
(cherry picked from commit 38d329bd112e8be891f077b4e3300182930cf74d)
---
airflow/jobs/scheduler_job.py | 1 +
tests/jobs/test_scheduler_job.py | 31 +++++++++++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2c7378d..2a230a7 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1127,6 +1127,7 @@ class SchedulerJob(BaseJob):
num_failed = (
session.query(SchedulerJob)
.filter(
+ SchedulerJob.job_type == "SchedulerJob",
SchedulerJob.state == State.RUNNING,
SchedulerJob.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)),
)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 4922617..5380960 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -37,6 +37,7 @@ from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.jobs.backfill_job import BackfillJob
+from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
from airflow.models.dagrun import DagRun
@@ -2445,6 +2446,36 @@ class TestSchedulerJob:
if old_job.processor_agent:
old_job.processor_agent.end()
+ def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog):
+ """Make sure we only set SchedulerJobs to failed, not all jobs"""
+ session = settings.Session()
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.state = State.RUNNING
+ self.scheduler_job.latest_heartbeat = timezone.utcnow()
+ session.add(self.scheduler_job)
+ session.flush()
+
+ old_job = SchedulerJob(subdir=os.devnull)
+ old_job.state = State.RUNNING
+ old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
+ session.add(old_job)
+ session.flush()
+
+ old_task_job = BaseJob() # Imagine it's a LocalTaskJob, but this is easier to provision
+ old_task_job.state = State.RUNNING
+ old_task_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
+ session.add(old_task_job)
+ session.flush()
+
+ with caplog.at_level('INFO', logger='airflow.jobs.scheduler_job'):
+ self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
+ session.expire_all()
+
+ assert old_job.state == State.FAILED
+ assert old_task_job.state == State.RUNNING
+ assert 'Marked 1 SchedulerJob instances as failed' in caplog.messages
+
def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker):
"""Test SLA Callbacks are not sent when check_slas is False"""
dag_id = 'test_send_sla_callbacks_to_processor_sla_disabled'