You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/04 22:23:00 UTC

[airflow] branch main updated: Additional info about Segmentation Fault in LocalTaskJob (#27381)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 76f81cd4a7 Additional info about Segmentation Fault in LocalTaskJob (#27381)
76f81cd4a7 is described below

commit 76f81cd4a7433b7eeddb863b2ae6ee59176cf816
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Mon Dec 5 01:22:51 2022 +0300

    Additional info about Segmentation Fault in LocalTaskJob (#27381)
---
 airflow/jobs/local_task_job.py    | 38 ++++++++++++++++++++++++++++++++++++++
 tests/jobs/test_local_task_job.py | 37 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 74 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index fe7fc4a561..07541ff3ca 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -34,6 +34,36 @@ from airflow.utils.net import get_hostname
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
+SIGSEGV_MESSAGE = """
+******************************************* Received SIGSEGV *******************************************
+SIGSEGV (Segmentation Violation) signal indicates Segmentation Fault error which refers to
+an attempt by a program/library to write or read outside its allocated memory.
+
+In Python environment usually this signal refers to libraries which use low level C API.
+Make sure that you use use right libraries/Docker Images
+for your architecture (Intel/ARM) and/or Operational System (Linux/macOS).
+
+Suggested way to debug
+======================
+  - Set environment variable 'PYTHONFAULTHANDLER' to 'true'.
+  - Start airflow services.
+  - Restart failed airflow task.
+  - Check 'scheduler' and 'worker' services logs for additional traceback
+    which might contain information about module/library where actual error happen.
+
+Known Issues
+============
+
+Note: Only Linux-based distros supported as "Production" execution environment for Airflow.
+
+macOS
+-----
+ 1. Due to limitations in Apple's libraries not every process might 'fork' safe.
+    One of the general error is unable to query the macOS system configuration for network proxies.
+    If your are not using a proxy you could disable it by set environment variable 'no_proxy' to '*'.
+    See: https://github.com/python/cpython/issues/58037 and https://bugs.python.org/issue30385#msg293958
+********************************************************************************************************"""
+
 
 class LocalTaskJob(BaseJob):
     """LocalTaskJob runs a single task instance."""
@@ -83,6 +113,14 @@ class LocalTaskJob(BaseJob):
             self.task_runner.terminate()
             self.handle_task_exit(128 + signum)
 
+        def segfault_signal_handler(signum, frame):
+            """Setting sigmentation violation signal handler"""
+            self.log.critical(SIGSEGV_MESSAGE)
+            self.task_runner.terminate()
+            self.handle_task_exit(128 + signum)
+            raise AirflowException("Segmentation Fault detected.")
+
+        signal.signal(signal.SIGSEGV, segfault_signal_handler)
         signal.signal(signal.SIGTERM, signal_handler)
 
         if not self.task_instance.check_and_change_state_before_execution(
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index ecfb750fd1..f9661d869a 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import datetime
+import logging
 import os
 import re
 import signal
@@ -34,7 +35,7 @@ import pytest
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.executors.sequential_executor import SequentialExecutor
-from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.local_task_job import SIGSEGV_MESSAGE, LocalTaskJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.models.dagbag import DagBag
 from airflow.models.serialized_dag import SerializedDagModel
@@ -805,6 +806,40 @@ class TestLocalTaskJob:
         assert "Received SIGTERM. Terminating subprocesses" in caplog.text
         assert "Task exited with return code 143" in caplog.text
 
+    def test_process_sigsegv_error_message(self, caplog, dag_maker):
+        """Test that shows error if process failed with segmentation fault."""
+        caplog.set_level(logging.CRITICAL, logger="local_task_job.py")
+
+        def task_function(ti):
+            # pytest enable faulthandler by default unless `-p no:faulthandler` is given.
+            # It can not be disabled on the test level out of the box and
+            # that mean debug traceback would show in pytest output.
+            # For avoid this we disable it within the task which run in separate process.
+            import faulthandler
+
+            if faulthandler.is_enabled():
+                faulthandler.disable()
+
+            while not ti.pid:
+                time.sleep(0.1)
+
+            os.kill(psutil.Process(os.getpid()).ppid(), signal.SIGSEGV)
+
+        with dag_maker(dag_id="test_segmentation_fault"):
+            task = PythonOperator(
+                task_id="test_sigsegv",
+                python_callable=task_function,
+            )
+        dag_run = dag_maker.create_dagrun()
+        ti = TaskInstance(task=task, run_id=dag_run.run_id)
+        ti.refresh_from_db()
+        job = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        settings.engine.dispose()
+        with timeout(10):
+            with pytest.raises(AirflowException, match=r"Segmentation Fault detected"):
+                job.run()
+        assert SIGSEGV_MESSAGE in caplog.messages
+
 
 @pytest.fixture()
 def clean_db_helper():