You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/21 01:58:12 UTC

[GitHub] [airflow] dm03514 commented on issue #12771: Instrument Airflow with Opentracing/Opentelemetry

dm03514 commented on issue #12771:
URL: https://github.com/apache/airflow/issues/12771#issuecomment-748715017


   👍 For this feature. 
   
   @mik-laj 
   
   I created a little test script to illustrate building a trace from a dag run:
   
   https://github.com/dm03514/airflow-tracer/blob/main/README.md
   
   Tracing is a different type of telemetry (alongside statsd and sentry). It provides a "Gantt" Chart view into a dag. Some tracing providers (like LIghtstep) provide advanced dashboarding and alerting. 
   
   The POC project above builds a trace by querying airflows database. I'm hoping that there is a way to add tracing support inside airflow so that when dags are being executed they can emit open telemetry traces! 
   
   ```
    dag = serialized_dag.SerializedDagModel.get(dag_id=cli.dag_id)
       dagrun = dag.dag.get_dagrun(execution_date=cli.execution_date)
       tis = dagrun.get_task_instances()
   
       root_span = tracer.start_span(
           name=dag.dag.dag_id,
           start_time=dt_to_ns_epoch(dagrun.start_date)
       )
       root_span.end(end_time=dt_to_ns_epoch(dagrun.end_date))
   
       for ti in tis:
           ctx = trace.set_span_in_context(root_span)
   
           span = tracer.start_span(
               name=ti.task_id,
               context=ctx,
               start_time=dt_to_ns_epoch(ti.start_date),
           )
           # span.set_attribute('airflow.pool', ti.pool)
           # span.set_attribute('airflow.queue', ti.queue)
           span.set_attribute('airflow.state', ti.state)
           span.set_attribute('airflow.operation', ti.operator)
           # span.set_attribute('airflow.max_tries', ti.max_tries)
           if ti.job_id is not None:
               span.set_attribute('airflow.job_id', ti.job_id)
           # span.set_attribute('airflow.pool_slots', ti.pool_slots)
           # span.set_attribute('airflow.priority_weight', ti.priority_weight)
           if ti.state != 'success':
               span.set_attribute('error', True)
           span.end(end_time=dt_to_ns_epoch(ti.end_date))
   ```
   
   thank you!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org