You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/03 06:34:49 UTC

[GitHub] [airflow] Etienne-Carriere opened a new issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Etienne-Carriere opened a new issue #12771:
URL: https://github.com/apache/airflow/issues/12771


   **Description**
   
   I would like to instrument airflow with tracing technology like Opentracing (https://opentracing.io/) / Opentelemetry (https://opentelemetry.io/) 
   
   **Use case / motivation**
   
   The motivation are to : 
   * have the DAG logic represented in tracing 
   * transfer the tracing information to downwards requests 
   
   ** Implementation ideas ** 
   
   For example, on Celery, we use celery signals to hook before and after each task (https://github.com/uber-common/opentracing-python-instrumentation/blob/master/opentracing_instrumentation/client_hooks/celery.py)
   
   I am ok to contribute to the code but interested by some hints on where to look to . 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-869571812


   Indeed I think OpenTelemetry integration might be a good idea. I've heard (CC: @subashcanapathy) that Amazon has been very involved in OpenTelemetry, so maybe that's a possibility that somehow Amazon team can help with that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mjpieters commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mjpieters commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-753404477


   Interesting little test; but note it generates a trace from the tasks after the fact without support for additional spans created __inside__ each task invocation, and you the tracing context is not shared overy any RPC / REST / etc. calls to other services to inherit the tracing context.
   
   I integrated Jaeger tracing (opentracing) support into a production Airflow setup using Celery. Specific challenges I had to overcome:
   
   - The dagrun span is 'virtual', in that it exists from when the dagrun is created until the last task is completed. The scheduler will then update the dagrun state in the database. But tasks need a valid parent span to attach their own spans to.
   
     I solved this by creating a span in a monkey-patch to `DAG.create_dagrun()`, injecting the span info in to the dagrun configuration together with the start time, then _discarding the span_.  Then, in `DagRun.set_state()`, when the state changes to a finished state, I create a `jaeger_client.span.Span()` object from scratch using the dagrun conf-stored data, and submit that to Jaeger.
   
   - Tasks inherit the parent (dagrun) span context from the dagrun config; I patched `Task.run_raw_task()` to run the actual code under a `tracer.start_active_span()` context manager. This captures timing and any exceptions.
   
   - You need an active tracer for traces to be captured and sent on to the tracer agent. So I registered code to run in the `cli_action_loggers.register_pre_exec_callback()` hook when the `scheduler` or `dag_trigger` sub-commands run, which then registers a closer with `cli_action_loggers.register_post_exec_callback`. Closing a tracer in `dag_trigger` takes careful work with the asyncio / tornado loop used by the Jaeger client, you'll lose traces if you don't watch out. I found that you had to go hunt for the I/O loop attached to the trace reporter object and call `tracer.close()` from a callback sent to that loop as the only fail-proof method of getting the last traces out. I don't know if opentracing needs this level of awareness of the implementation details.
   
   But, with that work in place, we now get traces in mostly real time, with full support for tracing contexts being shared with other services called from Airflow tasks. We can trace a job through the frontend, submitting a job to Airflow, then follow any calls from tasks to further REST APIs, all as one system.
   
   I'd prefer it if the tracing context was not shoehorned into the dagrun configuration; I'd have created additional database tables or columns for this in the Airflow database model if I had to do this inside the Airflow project itself.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-747374938


   https://github.com/googleapis/python-bigquery/blob/master/google/cloud/bigquery/opentelemetry_tracing.py


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-737698552


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-747373540


   @potiuk Looks like it's from something BigQuery related.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599


   I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help.
   
   We have a python module that hooks into the Celery worker processes directly via a Signal as suggested by the [OpenTelemetry Celery docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html).  
   
   For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's [`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39), provided the environment variable [`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options).  This helps us load our own module to inject the Celery signal when [prior to the Celery app is being created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74).  We then instrument the worker processes with whatever OpenTelemetry providers we need in a [`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init) signal.
   
   *my_app/tracing.py*
   ```python
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   from celery.signals import worker_process_init
   from opentelemetry import trace
   from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
   from opentelemetry.instrumentation.flask import FlaskInstrumentor
   from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
   from opentelemetry.instrumentation.requests import RequestsInstrumentor
   from opentelemetry.sdk.resources import Resource
   from opentelemetry.sdk.trace import sampling
   from opentelemetry.sdk.trace import TracerProvider
   from opentelemetry.sdk.trace.export import SimpleSpanProcessor
   
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   
   
   @worker_process_init.connect(weak=False)  # type: ignore
   def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
       tracer_provider = TracerProvider(
           resource=Resource(attributes={'service.name': 'my-service'}),
           sampler=sampling.ALWAYS_ON
       )
       tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
       trace.set_tracer_provider(tracer_provider)
       GrpcInstrumentorClient().instrument()
       RequestsInstrumentor().instrument()
   ```
   
   Our worker process is then started with the following configuration option:
   ```shell
   export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
   ```
   
   For parent context's when executing spans, we embed a OpenTelemetry trace ID into the DAG run configuration from whatever service is executing the DAG.
   
   There is however two strange things that occur with this.
   1. We would have preferred using the `BatchSpanProcessor` as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing.  Using the `SimpleSpanProcessor` did not have that issue.
   2. The `OTLPSpanExporter` may cause the following errors below.  I haven't been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not suffer from that issue.  It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran.  Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it?  I attempted to increase Celery's timeout ([worker_proc_alive_timeout](https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout)) there to something higher like 20 seconds and still saw the same issue though.
   
   > [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)'
   > [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-747374349


   Yeah. It can likely be disabled by adding some custom logging configuration - possibly that's what we should do I think


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dm03514 commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
dm03514 commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-748715017


   👍 For this feature. 
   
   @mik-laj 
   
   I created a little test script to illustrate building a trace from a dag run:
   
   https://github.com/dm03514/airflow-tracer/blob/main/README.md
   
   Tracing is a different type of telemetry (alongside statsd and sentry). It provides a "Gantt" Chart view into a dag. Some tracing providers (like LIghtstep) provide advanced dashboarding and alerting. 
   
   The POC project above builds a trace by querying airflows database. I'm hoping that there is a way to add tracing support inside airflow so that when dags are being executed they can emit open telemetry traces! 
   
   ```
    dag = serialized_dag.SerializedDagModel.get(dag_id=cli.dag_id)
       dagrun = dag.dag.get_dagrun(execution_date=cli.execution_date)
       tis = dagrun.get_task_instances()
   
       root_span = tracer.start_span(
           name=dag.dag.dag_id,
           start_time=dt_to_ns_epoch(dagrun.start_date)
       )
       root_span.end(end_time=dt_to_ns_epoch(dagrun.end_date))
   
       for ti in tis:
           ctx = trace.set_span_in_context(root_span)
   
           span = tracer.start_span(
               name=ti.task_id,
               context=ctx,
               start_time=dt_to_ns_epoch(ti.start_date),
           )
           # span.set_attribute('airflow.pool', ti.pool)
           # span.set_attribute('airflow.queue', ti.queue)
           span.set_attribute('airflow.state', ti.state)
           span.set_attribute('airflow.operation', ti.operator)
           # span.set_attribute('airflow.max_tries', ti.max_tries)
           if ti.job_id is not None:
               span.set_attribute('airflow.job_id', ti.job_id)
           # span.set_attribute('airflow.pool_slots', ti.pool_slots)
           # span.set_attribute('airflow.priority_weight', ti.priority_weight)
           if ti.state != 'success':
               span.set_attribute('error', True)
           span.end(end_time=dt_to_ns_epoch(ti.end_date))
   ```
   
   thank you!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mjpieters edited a comment on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mjpieters edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-753404477


   Interesting little test; but note it generates a trace from the tasks after the fact without support for additional spans created __inside__ each task invocation, and you the tracing context is not shared overy any RPC / REST / etc. calls to other services to inherit the tracing context.
   
   I integrated Jaeger tracing (opentracing) support into a production Airflow setup using Celery. Specific challenges I had to overcome:
   
   - The dagrun span is 'virtual', in that it exists from when the dagrun is created until the last task is completed. The scheduler will then update the dagrun state in the database. But tasks need a valid parent span to attach their own spans to.
   
     I solved this by creating a span in a monkey-patch to `DAG.create_dagrun()`, injecting the span info in to the dagrun configuration together with the start time, then _discarding the span_.  Then, in `DagRun.set_state()`, when the state changes to a finished state, I create a `jaeger_client.span.Span()` object from scratch using the dagrun conf-stored data, and submit that to Jaeger.
   
   - Tasks inherit the parent (dagrun) span context from the dagrun config; I patched `Task.run_raw_task()` to run the actual code under a `tracer.start_active_span()` context manager. This captures timing and any exceptions.
   
   - You need an active tracer for traces to be captured and sent on to the tracer agent. So I registered code to run in the `cli_action_loggers.register_pre_exec_callback()` hook when the `scheduler` or `dag_trigger` sub-commands run, which then registers a closer with `cli_action_loggers.register_post_exec_callback`. Closing a tracer in `dag_trigger` takes careful work with the asyncio / tornado loop used by the Jaeger client, you'll lose traces if you don't watch out. I found that you had to go hunt for the I/O loop attached to the trace reporter object and call `tracer.close()` from a callback sent to that loop as the only fail-proof method of getting the last traces out. I don't know if opentracing needs this level of awareness of the implementation details.
   
   - You generally want to start a trace when the Airflow webserver receives a trigger, so instrument the Flask layer too. This is where the sampling decision needs to be taken too; tracing often only instruments a subset of all jobs, but in this project I set the sampling frequency to 100% at this level.
   
   - I added a custom configuration section to airflow.cfg to allow me to tweak Jaeger tracing parameters.
   
   But, with that work in place, we now get traces in mostly real time, with full support for tracing contexts being shared with other services called from Airflow tasks. We can trace a job through the frontend, submitting a job to Airflow, then follow any calls from tasks to further REST APIs, all as one system.
   
   I'd prefer it if the tracing context was not shoehorned into the dagrun configuration; I'd have created additional database tables or columns for this in the Airflow database model if I had to do this inside the Airflow project itself.
   
   Note that I did *not* use the Celery task hooks here to track the task spans, because Celery has its own overhead that we wanted to keep separate. The `opentracing_instrumentation` already has Celery hooks you can use, but  I needed the timings to be closer to the actual task invocation.
   
   Anothing thing to consider is per-DAG or per-task tagging you want to add to the spans. For this project I needed to track specific data from the submitted DAG config so we can compare task runs using the same input configuration.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-747383652


   Added #13131 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
malthe commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-865902431


   What's a good place to set up instrumentation and exporting in the various components: scheduler, webserver, workers - ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aladinoss commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
aladinoss commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-937793071


   @potiuk what do you think injecting this under theses modules ? 
   airflow.hooks.base
   airflow.sensors.base
   airflow.models.baseoperator


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Etienne-Carriere commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
Etienne-Carriere commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-738018370


   Hello @mik-laj , thank you for the link about sentry. It will helps to design a hook (in a first time in an external repo and perhaps merged in a second time)
   Opentracing/Opentelemetry is an open-source standard for APM (in the same level of sentry) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599


   I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help.
   
   We have a python module that hooks into the Celery worker processes directly via a Signal as suggested by the [OpenTelemetry Celery docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html).  
   
   For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's [`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39), provided the environment variable [`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options).  This helps us load our own module to inject the Celery signal when [prior to the Celery app is being created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74).  We then instrument the worker processes with whatever OpenTelemetry providers we need in a [`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init) signal.
   
   *my_app/tracing.py*
   ```python
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   from celery.signals import worker_process_init
   from opentelemetry import trace
   from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
   from opentelemetry.instrumentation.flask import FlaskInstrumentor
   from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
   from opentelemetry.instrumentation.requests import RequestsInstrumentor
   from opentelemetry.sdk.resources import Resource
   from opentelemetry.sdk.trace import sampling
   from opentelemetry.sdk.trace import TracerProvider
   from opentelemetry.sdk.trace.export import SimpleSpanProcessor
   
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   
   
   @worker_process_init.connect(weak=False)  # type: ignore
   def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
       tracer_provider = TracerProvider(
           resource=Resource(attributes={'service.name': 'my-service'}),
           sampler=sampling.ALWAYS_ON
       )
       tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
       trace.set_tracer_provider(tracer_provider)
       GrpcInstrumentorClient().instrument()
       RequestsInstrumentor().instrument()
   ```
   
   Our worker process is then started with the following configuration option:
   ```shell
   export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
   ```
   
   For parent context's when executing spans, we embed a OpenTelemetry trace ID into the DAG run configuration from whatever service is executing the DAG.  Operators simply pull that trace ID out of the DAG run configuration and start their own trace contextmanager.
   
   There is however two strange things that occur with this.
   1. We would have preferred using the `BatchSpanProcessor` as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing.  Using the `SimpleSpanProcessor` did not have that issue.
   2. The `OTLPSpanExporter` may cause the following errors below.  I haven't been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not suffer from that issue.  It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran.  Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it?  I attempted to increase Celery's timeout ([worker_proc_alive_timeout](https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout)) there to something higher like 20 seconds and still saw the same issue though.
   
   > [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)'
   > [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aladinoss commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
aladinoss commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-937793071


   @potiuk what do you think injecting this under theses modules ? 
   airflow.hooks.base
   airflow.sensors.base
   airflow.models.baseoperator


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dm03514 edited a comment on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
dm03514 edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-748715017


   👍 For this feature. 
   
   @mik-laj 
   
   I created a little test script to illustrate building a trace from a dag run:
   
   https://github.com/dm03514/airflow-tracer/blob/main/README.md
   
   Tracing is a different type of telemetry (alongside statsd and sentry). It provides a "Gantt" Chart view into a dag. Some tracing providers (like LIghtstep) provide advanced dashboarding and alerting. 
   
   The POC project above builds a trace by querying airflows database. I'm hoping that there is a way to add tracing support inside airflow so that when dags are being executed they can emit open telemetry traces! 
   
   With tracing each task span needs a reference to the global span and i'm not sure how easy / hard this is to share data across tasks? 
   
   Start the root span when the dagrun starts:
   ```
       root_span = tracer.start_span(
           name=dag.dag.dag_id
       )
   ```
   
   Start a task specific span when each task starts:
   
   ```
           ctx = trace.set_span_in_context(root_span)
   
           span = tracer.start_span(
               name=ti.task_id,
               context=ctx
           )
   
          ... # invoke task
   
         span.end() # task is complete
   ```
   
   Close the root span when all tasks are complete
   
   ```
   root_span.end()
   ```
   
   ---- 
   
   
   ```python
   def main(cli):
    dag = serialized_dag.SerializedDagModel.get(dag_id=cli.dag_id)
       dagrun = dag.dag.get_dagrun(execution_date=cli.execution_date)
       tis = dagrun.get_task_instances()
   
       root_span = tracer.start_span(
           name=dag.dag.dag_id,
           start_time=dt_to_ns_epoch(dagrun.start_date)
       )
       root_span.end(end_time=dt_to_ns_epoch(dagrun.end_date))
   
       for ti in tis:
           ctx = trace.set_span_in_context(root_span)
   
           span = tracer.start_span(
               name=ti.task_id,
               context=ctx,
               start_time=dt_to_ns_epoch(ti.start_date),
           )
           span.set_attribute('airflow.state', ti.state)
           span.set_attribute('airflow.operation', ti.operator)
           if ti.job_id is not None:
               span.set_attribute('airflow.job_id', ti.job_id)
           if ti.state != 'success':
               span.set_attribute('error', True)
           span.end(end_time=dt_to_ns_epoch(ti.end_date))
   ```
   
   thank you!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] WattsInABox commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
WattsInABox commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-769223737


   @mjpieters can you share any of the code at all? I think I understand what you're saying but I think also I would be rediscovering problems you have already solved without the code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dm03514 edited a comment on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
dm03514 edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-748715017


   👍 For this feature. 
   
   @mik-laj 
   
   I created a little test script to illustrate building a trace from a dag run:
   
   https://github.com/dm03514/airflow-tracer/blob/main/README.md
   
   Tracing is a different type of telemetry (alongside statsd and sentry). It provides a "Gantt" Chart view into a dag. Some tracing providers (like LIghtstep) provide advanced dashboarding and alerting. 
   
   The POC project above builds a trace by querying airflows database. I'm hoping that there is a way to add tracing support inside airflow so that when dags are being executed they can emit open telemetry traces! 
   
   ```python
   def main(cli):
    dag = serialized_dag.SerializedDagModel.get(dag_id=cli.dag_id)
       dagrun = dag.dag.get_dagrun(execution_date=cli.execution_date)
       tis = dagrun.get_task_instances()
   
       root_span = tracer.start_span(
           name=dag.dag.dag_id,
           start_time=dt_to_ns_epoch(dagrun.start_date)
       )
       root_span.end(end_time=dt_to_ns_epoch(dagrun.end_date))
   
       for ti in tis:
           ctx = trace.set_span_in_context(root_span)
   
           span = tracer.start_span(
               name=ti.task_id,
               context=ctx,
               start_time=dt_to_ns_epoch(ti.start_date),
           )
           span.set_attribute('airflow.state', ti.state)
           span.set_attribute('airflow.operation', ti.operator)
           if ti.job_id is not None:
               span.set_attribute('airflow.job_id', ti.job_id)
           if ti.state != 'success':
               span.set_attribute('error', True)
           span.end(end_time=dt_to_ns_epoch(ti.end_date))
   ```
   
   thank you!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-747372741


   Hello @Etienne-Carriere @mik-laj  -> we started recently seeing those annoying logs in Airflow 2.0.
   
   ```
   {opentelemetry_tracing.py:29} INFO - This service is instrumented using OpenTelemetry. OpenTelemetry could not be imported; please add opentelemetry-api and opentelemetry-instrumentation packages in order to get BigQuery Tracing data.
   ```
   
   Is this something you know about ? 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-747373919


   I know @mjpieters was playing/hacking about with adding opentracing to Airflow too


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mjpieters commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mjpieters commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-770396790


   @WattsInABox the code is part of a private project, and the majority is either project specific or specific to the `jaeger_client` implementation, neither of which apply here. E.g. the custom configuration section and how to close the client are tied very closely to the `jaeger_client` codebase, as is how you create a `jaeger_client.span.Span()` instance from scratch.
   
   I can see about talking to the client about scrubbing the project-specific details from the code, however.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa3pankaj commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
aa3pankaj commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1030131942


   sure, I would be happy to contribute in this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1030105796


   We are just about to formalize all the open-telemetry experiences and proposal into an AIP (Airlfow Improvement Proposal). Stay tuned for a draft AIP proposal on that one. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham edited a comment on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mtraynham edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599


   I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help.
   
   We have a python module that hooks into the Celery worker processes directly via a Signal as suggested by the [OpenTelemetry Celery docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html).  
   
   For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's [`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39), provided the environment variable [`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options).  This helps us load our own module to inject the Celery signal when [prior to the Celery app is being created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74).  We then instrument the worker processes with whatever OpenTelemetry providers we need in a [`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init) signal.
   
   *my_app/tracing.py*
   ```python
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   from celery.signals import worker_process_init
   from opentelemetry import trace
   from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
   from opentelemetry.instrumentation.flask import FlaskInstrumentor
   from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
   from opentelemetry.instrumentation.requests import RequestsInstrumentor
   from opentelemetry.sdk.resources import Resource
   from opentelemetry.sdk.trace import sampling
   from opentelemetry.sdk.trace import TracerProvider
   from opentelemetry.sdk.trace.export import SimpleSpanProcessor
   
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   
   
   @worker_process_init.connect(weak=False)  # type: ignore
   def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
       tracer_provider = TracerProvider(
           resource=Resource(attributes={'service.name': 'my-service'}),
           sampler=sampling.ALWAYS_ON
       )
       tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
       trace.set_tracer_provider(tracer_provider)
       GrpcInstrumentorClient().instrument()
       RequestsInstrumentor().instrument()
   ```
   
   Our worker process is then started with the following configuration option:
   ```shell
   export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
   ```
   
   For parent context's when executing spans, we embed a OpenTelemetry trace ID into the DAG run configuration from whatever service is executing the DAG.
   
   There is however two strange things that occur with this.
   1. We would have preferred using the `BatchSpanProcessor` as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing.  Using the `SimpleSpanProcessor` did not have that issue.
   2. The `OTLPSpanExporter` may cause the following errors below.  I haven't been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not suffer from that issue.  It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran.  Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it?  I attempted to increase Celery's timeout ((worker_proc_alive_timeout)[https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout]) there to something higher like 20 seconds and still saw the same issue though.
   
   > [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)'
   > [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-737733115


   How is Opentracing / Opentelemetry different from Sentry? We have native integration with Sentry and I wonder if we should combine these two features or develop it independently.
   http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/logging-monitoring/errors.html


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dm03514 edited a comment on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
dm03514 edited a comment on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-748715017


   👍 For this feature. 
   
   @mik-laj 
   
   I created a little test script to illustrate building a trace from a dag run:
   
   https://github.com/dm03514/airflow-tracer/blob/main/README.md
   
   Tracing is a different type of telemetry (alongside statsd and sentry). It provides a "Gantt" Chart view into a dag. Some tracing providers (like LIghtstep) provide advanced dashboarding and alerting. 
   
   The POC project above builds a trace by querying airflows database. I'm hoping that there is a way to add tracing support inside airflow so that when dags are being executed they can emit open telemetry traces! 
   
   ```python
   def main(cli):
    dag = serialized_dag.SerializedDagModel.get(dag_id=cli.dag_id)
       dagrun = dag.dag.get_dagrun(execution_date=cli.execution_date)
       tis = dagrun.get_task_instances()
   
       root_span = tracer.start_span(
           name=dag.dag.dag_id,
           start_time=dt_to_ns_epoch(dagrun.start_date)
       )
       root_span.end(end_time=dt_to_ns_epoch(dagrun.end_date))
   
       for ti in tis:
           ctx = trace.set_span_in_context(root_span)
   
           span = tracer.start_span(
               name=ti.task_id,
               context=ctx,
               start_time=dt_to_ns_epoch(ti.start_date),
           )
           # span.set_attribute('airflow.pool', ti.pool)
           # span.set_attribute('airflow.queue', ti.queue)
           span.set_attribute('airflow.state', ti.state)
           span.set_attribute('airflow.operation', ti.operator)
           # span.set_attribute('airflow.max_tries', ti.max_tries)
           if ti.job_id is not None:
               span.set_attribute('airflow.job_id', ti.job_id)
           # span.set_attribute('airflow.pool_slots', ti.pool_slots)
           # span.set_attribute('airflow.priority_weight', ti.priority_weight)
           if ti.state != 'success':
               span.set_attribute('error', True)
           span.end(end_time=dt_to_ns_epoch(ti.end_date))
   ```
   
   thank you!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aa3pankaj commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
aa3pankaj commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1029932364


   @dm03514 thanks, i tried your test code, it is exporting spans successfully.
   But exporting spans directly from tasks would be much useful in terms of tracing different operations like hook execution, api calls etc.
   
   @mjpieters I tried exporting span from a airflow task:
   
   packages:
   apache-airflow==2.1.3
   opentelemetry-api==1.9.1
   opentelemetry-sdk==1.9.1
   opentelemetry-exporter-otlp==1.9.1
   opentelemetry-instrumentation==0.28b1
   
   env variables related to opentelemetry:
   OTEL_TRACES_EXPORTER=otlp
   OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:55680" OTEL_RESOURCE_ATTRIBUTES="service.name=test_airflow_worker"
   OTEL_TRACES_SAMPLER="always_on" 
   
   
   Airflow task:
   
   ```
   from airflow.models import BaseOperator
   from opentelemetry import trace
   from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
   from opentelemetry.sdk.resources import Resource
   from opentelemetry.sdk.trace import TracerProvider
   from opentelemetry.sdk.trace.export import BatchSpanProcessor
   
   
   class TracingTestOperator(BaseOperator): 
   
       def execute(self, context):
   
           resource = Resource(attributes={
               "service.name": "test_airflow_worker"
           })
   
           trace.set_tracer_provider(TracerProvider(resource=resource))
           tracer = trace.get_tracer(__name__)
   
           otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:55680", insecure=True)
   
           span_processor = BatchSpanProcessor(otlp_exporter)
   
           trace.get_tracer_provider().add_span_processor(span_processor)
   
           with tracer.start_as_current_span("test_task_span"):
               print("Hello Airflow!")
   ```
   
   Above code is not exporting spans to the collector, even though collector (otlp) is up.
   Strangely, same code is exporting span successfully when I run it as standalone python (directly invoking execute method).
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mtraynham commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

Posted by GitBox <gi...@apache.org>.
mtraynham commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-1035159599


   I'm using OpenTelemetry with Celery on our worker processes, so maybe this will help.
   
   We have a python module that hooks into Celery worker processes directly via a Signal as suggested by the [OpenTelemetry Celery docs](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html).  
   
   For us to hook into Celery from Airflow, we have a Python module that re-exports Airflow's [`DEFAULT_CELERY_CONFIG`](https://github.com/apache/airflow/blob/2.2.3/airflow/config_templates/default_celery.py#L39), provided the environment variable [`AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS`](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery-config-options).  This helps us load our own module to inject the Celery signal when [prior to the Celery app is being created](https://github.com/apache/airflow/blob/2.2.3/airflow/executors/celery_executor.py#L71-L74).  We then instrument the worker processes with whatever OpenTelemetry providers we need in a [`worker_process_init`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-process-init) signal.
   
   *my_app/tracing.py*
   ```python
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   from celery.signals import worker_process_init
   from opentelemetry import trace
   from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
   from opentelemetry.instrumentation.flask import FlaskInstrumentor
   from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
   from opentelemetry.instrumentation.requests import RequestsInstrumentor
   from opentelemetry.sdk.resources import Resource
   from opentelemetry.sdk.trace import sampling
   from opentelemetry.sdk.trace import TracerProvider
   from opentelemetry.sdk.trace.export import SimpleSpanProcessor
   
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   
   
   @worker_process_init.connect(weak=False)  # type: ignore
   def instrument_worker(*args: typing.Any, **kwargs: typing.Any) -> None:
       tracer_provider = TracerProvider(
           resource=Resource(attributes={'service.name': 'my-service'}),
           sampler=sampling.ALWAYS_ON
       )
       tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
       trace.set_tracer_provider(tracer_provider)
       GrpcInstrumentorClient().instrument()
       RequestsInstrumentor().instrument()
   ```
   
   Our worker process is then started with the following configuration option:
   ```shell
   export AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=my_app.tracing.CELERY_CONFIG
   ```
   
   There is however two strange things that occur with this.
   1. We would have preferred using the `BatchSpanProcessor` as this runs in a separate thread queuing up spans, but it seems that not all the spans were being exported to our OTLP collector and records were missing.  Using the `SimpleSpanProcessor` did not have that issue.
   2. The `OTLPSpanExporter` may cause the following errors below.  I haven't been able to pin-point why that occurs, but the `ConsoleSpanExporter` does not suffer from that issue.  It doesn't seem like any tasks are failing and largely this seems to occur on Workers that have no tasks being currently ran.  Celery's `worker_process_init` is limited to a 4-second blocking call, so maybe it's a startup timing issue on the Worker process that causes it?  I attempted to increase Celery's timeout ((worker_proc_alive_timeout)[https://docs.celeryproject.org/en/stable/userguide/configuration.html#worker-proc-alive-timeout]) there to something higher like 20 seconds and still saw the same issue though.
   
   > [2022-02-07 20:34:21,364: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:3506 exited with 'signal 11 (SIGSEGV)'
   > [2022-02-07 20:34:21,421: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 245.')
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org