You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/09 20:40:17 UTC

[airflow] branch v2-0-test updated: Fixed #14270: Add error message in OOM situations (#15207)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-0-test by this push:
     new 57a8afd  Fixed #14270: Add error message in OOM situations (#15207)
57a8afd is described below

commit 57a8afd6b5f7aee8692add571e580358fc9d4257
Author: Andrew Godwin <an...@astronomer.io>
AuthorDate: Tue Apr 6 13:02:11 2021 -0600

    Fixed #14270: Add error message in OOM situations (#15207)
    
    In the case where a child process is reaped early (before we get to it)
    the presumption in the code is that it is due to an OOM error and we set
    the return code -9. This adds an error message alongside that return
    code to make it more obvious.
    
    (cherry picked from commit 18e2c1de776c8c3bc42c984ea0d31515788b6572)
---
 airflow/task/task_runner/standard_task_runner.py   |  8 +++
 .../task/task_runner/test_standard_task_runner.py  | 59 ++++++++++++++++++----
 2 files changed, 57 insertions(+), 10 deletions(-)

diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 505b225..bb566b2 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -121,3 +121,11 @@ class StandardTaskRunner(BaseTaskRunner):
         if self._rc is None:
             # Something else reaped it before we had a chance, so let's just "guess" at an error code.
             self._rc = -9
+
+        if self._rc == -9:
+            # If either we or psutil gives out a -9 return code, it likely means
+            # an OOM happened
+            self.log.error(
+                'Job %s was killed before it finished (likely due to running out of memory)',
+                self._task_instance.job_id,
+            )
diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py
index fcd4948..6a3ab5d 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -19,11 +19,11 @@ import getpass
 import logging
 import os
 import time
-import unittest
 from logging.config import dictConfig
 from unittest import mock
 
 import psutil
+import pytest
 
 from airflow import models, settings
 from airflow.jobs.local_task_job import LocalTaskJob
@@ -48,22 +48,24 @@ LOGGING_CONFIG = {
             'class': 'logging.StreamHandler',
             'formatter': 'airflow.task',
             'stream': 'ext://sys.stdout',
-        }
+        },
     },
-    'loggers': {'airflow': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}},
+    'loggers': {'airflow': {'handlers': ['console'], 'level': 'INFO', 'propagate': True}},
 }
 
 
-class TestStandardTaskRunner(unittest.TestCase):
-    @classmethod
-    def setUpClass(cls):
+class TestStandardTaskRunner:
+    @pytest.fixture(autouse=True, scope="class")
+    def logging_and_db(self):
+        """
+        This fixture sets up logging to have a different setup on the way in
+        (as the test environment does not have enough context for the normal
+        way to run) and ensures they reset back to normal on the way out.
+        """
         dictConfig(LOGGING_CONFIG)
-
-    @classmethod
-    def tearDownClass(cls):
+        yield
         airflow_logger = logging.getLogger('airflow')
         airflow_logger.handlers = []
-        airflow_logger.propagate = True
         try:
             clear_db_runs()
         except Exception:  # noqa pylint: disable=broad-except
@@ -131,6 +133,43 @@ class TestStandardTaskRunner(unittest.TestCase):
 
         assert runner.return_code() is not None
 
+    def test_early_reap_exit(self, caplog):
+        """
+        Tests that when a child process running a task is killed externally
+        (e.g. by an OOM error, which we fake here), then we get return code
+        -9 and a log message.
+        """
+        # Set up mock task
+        local_task_job = mock.Mock()
+        local_task_job.task_instance = mock.MagicMock()
+        local_task_job.task_instance.run_as_user = getpass.getuser()
+        local_task_job.task_instance.command_as_list.return_value = [
+            'airflow',
+            'tasks',
+            'test',
+            'test_on_kill',
+            'task1',
+            '2016-01-01',
+        ]
+
+        # Kick off the runner
+        runner = StandardTaskRunner(local_task_job)
+        runner.start()
+        time.sleep(0.2)
+
+        # Kill the child process externally from the runner
+        # Note that we have to do this from ANOTHER process, as if we just
+        # call os.kill here we're doing it from the parent process and it
+        # won't be the same as an external kill in terms of OS tracking.
+        pgid = os.getpgid(runner.process.pid)
+        os.system(f"kill -s KILL {pgid}")
+        time.sleep(0.2)
+
+        runner.terminate()
+
+        assert runner.return_code() == -9
+        assert "running out of memory" in caplog.text
+
     def test_on_kill(self):
         """
         Test that ensures that clearing in the UI SIGTERMS