You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/10 16:54:05 UTC
[GitHub] [airflow] mtraynham commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry
mtraynham commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599
I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help.
We have a python module that hooks into Celery worker processes directly via a Signal as suggested by the [OpenTelemetry Celery docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html).
For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's [`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39), provided the environment variable [`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options). This helps us load our own module to inject the Celery signal when [prior to the Celery app is being created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74). We then instrument the worker processes with whatever OpenTelemetry providers we need in a [`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init) signal.
*my_app/tracing.py*
```python
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from celery.signals import worker_process_init
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import sampling
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
CELERY_CONFIG = DEFAULT_CELERY_CONFIG
@worker_process_init.connect(weak=False) # type: ignore
def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
tracer_provider = TracerProvider(
resource=Resource(attributes={'service.name': 'my-service'}),
sampler=sampling.ALWAYS_ON
)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
GrpcInstrumentorClient().instrument()
RequestsInstrumentor().instrument()
```
Our worker process is then started with the following configuration option:
```shell
export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
```
There is however two strange things that occur with this.
1. We would have preferred using the `BatchSpanProcessor` as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing. Using the `SimpleSpanProcessor` did not have that issue.
2. The `OTLPSpanExporter` may cause the following errors below. I haven't been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not suffer from that issue. It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran. Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it? I attempted to increase Celery's timeout ((worker_proc_alive_timeout)[https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout]) there to something higher like 20 seconds and still saw the same issue though.
> [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)'
> [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org