You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/09 20:40:17 UTC
[airflow] branch v2-0-test updated: Fixed #14270: Add error message
in OOM situations (#15207)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-0-test by this push:
new 57a8afd Fixed #14270: Add error message in OOM situations (#15207)
57a8afd is described below
commit 57a8afd6b5f7aee8692add571e580358fc9d4257
Author: Andrew Godwin <an...@astronomer.io>
AuthorDate: Tue Apr 6 13:02:11 2021 -0600
Fixed #14270: Add error message in OOM situations (#15207)
In the case where a child process is reaped early (before we get to it)
the presumption in the code is that it is due to an OOM error and we set
the return code -9. This adds an error message alongside that return
code to make it more obvious.
(cherry picked from commit 18e2c1de776c8c3bc42c984ea0d31515788b6572)
---
airflow/task/task_runner/standard_task_runner.py | 8 +++
.../task/task_runner/test_standard_task_runner.py | 59 ++++++++++++++++++----
2 files changed, 57 insertions(+), 10 deletions(-)
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 505b225..bb566b2 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -121,3 +121,11 @@ class StandardTaskRunner(BaseTaskRunner):
if self._rc is None:
# Something else reaped it before we had a chance, so let's just "guess" at an error code.
self._rc = -9
+
+ if self._rc == -9:
+ # If either we or psutil gives out a -9 return code, it likely means
+ # an OOM happened
+ self.log.error(
+ 'Job %s was killed before it finished (likely due to running out of memory)',
+ self._task_instance.job_id,
+ )
diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py
index fcd4948..6a3ab5d 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -19,11 +19,11 @@ import getpass
import logging
import os
import time
-import unittest
from logging.config import dictConfig
from unittest import mock
import psutil
+import pytest
from airflow import models, settings
from airflow.jobs.local_task_job import LocalTaskJob
@@ -48,22 +48,24 @@ LOGGING_CONFIG = {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout',
- }
+ },
},
- 'loggers': {'airflow': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}},
+ 'loggers': {'airflow': {'handlers': ['console'], 'level': 'INFO', 'propagate': True}},
}
-class TestStandardTaskRunner(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
+class TestStandardTaskRunner:
+ @pytest.fixture(autouse=True, scope="class")
+ def logging_and_db(self):
+ """
+ This fixture sets up logging to have a different setup on the way in
+ (as the test environment does not have enough context for the normal
+ way to run) and ensures they reset back to normal on the way out.
+ """
dictConfig(LOGGING_CONFIG)
-
- @classmethod
- def tearDownClass(cls):
+ yield
airflow_logger = logging.getLogger('airflow')
airflow_logger.handlers = []
- airflow_logger.propagate = True
try:
clear_db_runs()
except Exception: # noqa pylint: disable=broad-except
@@ -131,6 +133,43 @@ class TestStandardTaskRunner(unittest.TestCase):
assert runner.return_code() is not None
+ def test_early_reap_exit(self, caplog):
+ """
+ Tests that when a child process running a task is killed externally
+ (e.g. by an OOM error, which we fake here), then we get return code
+ -9 and a log message.
+ """
+ # Set up mock task
+ local_task_job = mock.Mock()
+ local_task_job.task_instance = mock.MagicMock()
+ local_task_job.task_instance.run_as_user = getpass.getuser()
+ local_task_job.task_instance.command_as_list.return_value = [
+ 'airflow',
+ 'tasks',
+ 'test',
+ 'test_on_kill',
+ 'task1',
+ '2016-01-01',
+ ]
+
+ # Kick off the runner
+ runner = StandardTaskRunner(local_task_job)
+ runner.start()
+ time.sleep(0.2)
+
+ # Kill the child process externally from the runner
+ # Note that we have to do this from ANOTHER process, as if we just
+ # call os.kill here we're doing it from the parent process and it
+ # won't be the same as an external kill in terms of OS tracking.
+ pgid = os.getpgid(runner.process.pid)
+ os.system(f"kill -s KILL {pgid}")
+ time.sleep(0.2)
+
+ runner.terminate()
+
+ assert runner.return_code() == -9
+ assert "running out of memory" in caplog.text
+
def test_on_kill(self):
"""
Test that ensures that clearing in the UI SIGTERMS