You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/04 22:23:00 UTC
[airflow] branch main updated: Additional info about Segmentation Fault in LocalTaskJob (#27381)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76f81cd4a7 Additional info about Segmentation Fault in LocalTaskJob (#27381)
76f81cd4a7 is described below
commit 76f81cd4a7433b7eeddb863b2ae6ee59176cf816
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Mon Dec 5 01:22:51 2022 +0300
Additional info about Segmentation Fault in LocalTaskJob (#27381)
---
airflow/jobs/local_task_job.py | 38 ++++++++++++++++++++++++++++++++++++++
tests/jobs/test_local_task_job.py | 37 ++++++++++++++++++++++++++++++++++++-
2 files changed, 74 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index fe7fc4a561..07541ff3ca 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -34,6 +34,36 @@ from airflow.utils.net import get_hostname
from airflow.utils.session import provide_session
from airflow.utils.state import State
+SIGSEGV_MESSAGE = """
+******************************************* Received SIGSEGV *******************************************
+SIGSEGV (Segmentation Violation) signal indicates Segmentation Fault error which refers to
+an attempt by a program/library to write or read outside its allocated memory.
+
+In Python environment usually this signal refers to libraries which use low level C API.
+Make sure that you use use right libraries/Docker Images
+for your architecture (Intel/ARM) and/or Operational System (Linux/macOS).
+
+Suggested way to debug
+======================
+ - Set environment variable 'PYTHONFAULTHANDLER' to 'true'.
+ - Start airflow services.
+ - Restart failed airflow task.
+ - Check 'scheduler' and 'worker' services logs for additional traceback
+ which might contain information about module/library where actual error happen.
+
+Known Issues
+============
+
+Note: Only Linux-based distros supported as "Production" execution environment for Airflow.
+
+macOS
+-----
+ 1. Due to limitations in Apple's libraries not every process might 'fork' safe.
+ One of the general error is unable to query the macOS system configuration for network proxies.
+ If your are not using a proxy you could disable it by set environment variable 'no_proxy' to '*'.
+ See: https://github.com/python/cpython/issues/58037 and https://bugs.python.org/issue30385#msg293958
+********************************************************************************************************"""
+
class LocalTaskJob(BaseJob):
"""LocalTaskJob runs a single task instance."""
@@ -83,6 +113,14 @@ class LocalTaskJob(BaseJob):
self.task_runner.terminate()
self.handle_task_exit(128 + signum)
+ def segfault_signal_handler(signum, frame):
+ """Setting sigmentation violation signal handler"""
+ self.log.critical(SIGSEGV_MESSAGE)
+ self.task_runner.terminate()
+ self.handle_task_exit(128 + signum)
+ raise AirflowException("Segmentation Fault detected.")
+
+ signal.signal(signal.SIGSEGV, segfault_signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if not self.task_instance.check_and_change_state_before_execution(
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index ecfb750fd1..f9661d869a 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime
+import logging
import os
import re
import signal
@@ -34,7 +35,7 @@ import pytest
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.sequential_executor import SequentialExecutor
-from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.jobs.local_task_job import SIGSEGV_MESSAGE, LocalTaskJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel
@@ -805,6 +806,40 @@ class TestLocalTaskJob:
assert "Received SIGTERM. Terminating subprocesses" in caplog.text
assert "Task exited with return code 143" in caplog.text
+ def test_process_sigsegv_error_message(self, caplog, dag_maker):
+ """Test that shows error if process failed with segmentation fault."""
+ caplog.set_level(logging.CRITICAL, logger="local_task_job.py")
+
+ def task_function(ti):
+ # pytest enable faulthandler by default unless `-p no:faulthandler` is given.
+ # It can not be disabled on the test level out of the box and
+ # that mean debug traceback would show in pytest output.
+ # For avoid this we disable it within the task which run in separate process.
+ import faulthandler
+
+ if faulthandler.is_enabled():
+ faulthandler.disable()
+
+ while not ti.pid:
+ time.sleep(0.1)
+
+ os.kill(psutil.Process(os.getpid()).ppid(), signal.SIGSEGV)
+
+ with dag_maker(dag_id="test_segmentation_fault"):
+ task = PythonOperator(
+ task_id="test_sigsegv",
+ python_callable=task_function,
+ )
+ dag_run = dag_maker.create_dagrun()
+ ti = TaskInstance(task=task, run_id=dag_run.run_id)
+ ti.refresh_from_db()
+ job = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+ settings.engine.dispose()
+ with timeout(10):
+ with pytest.raises(AirflowException, match=r"Segmentation Fault detected"):
+ job.run()
+ assert SIGSEGV_MESSAGE in caplog.messages
+
@pytest.fixture()
def clean_db_helper():