You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/10 16:54:05 UTC

[GitHub] [airflow] mtraynham commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

mtraynham commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599


   I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help.
   
   We have a python module that hooks into Celery worker processes directly via a Signal as suggested by the [OpenTelemetry Celery docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html).  
   
   For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's [`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39), provided the environment variable [`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options).  This helps us load our own module to inject the Celery signal when [prior to the Celery app is being created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74).  We then instrument the worker processes with whatever OpenTelemetry providers we need in a [`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init) signal.
   
   *my_app/tracing.py*
   ```python
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   from celery.signals import worker_process_init
   from opentelemetry import trace
   from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
   from opentelemetry.instrumentation.flask import FlaskInstrumentor
   from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
   from opentelemetry.instrumentation.requests import RequestsInstrumentor
   from opentelemetry.sdk.resources import Resource
   from opentelemetry.sdk.trace import sampling
   from opentelemetry.sdk.trace import TracerProvider
   from opentelemetry.sdk.trace.export import SimpleSpanProcessor
   
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   
   
   @worker_process_init.connect(weak=False)  # type: ignore
   def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
       tracer_provider = TracerProvider(
           resource=Resource(attributes={'service.name': 'my-service'}),
           sampler=sampling.ALWAYS_ON
       )
       tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
       trace.set_tracer_provider(tracer_provider)
       GrpcInstrumentorClient().instrument()
       RequestsInstrumentor().instrument()
   ```
   
   Our worker process is then started with the following configuration option:
   ```shell
   export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
   ```
   
   There is however two strange things that occur with this.
   1. We would have preferred using the `BatchSpanProcessor` as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing.  Using the `SimpleSpanProcessor` did not have that issue.
   2. The `OTLPSpanExporter` may cause the following errors below.  I haven't been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not suffer from that issue.  It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran.  Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it?  I attempted to increase Celery's timeout ((worker_proc_alive_timeout)[https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout]) there to something higher like 20 seconds and still saw the same issue though.
   
   > [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)'
   > [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org