You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/28 18:05:57 UTC

[GitHub] [airflow] eladkal commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

eladkal commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-828666111


   When you execute operator you just need to set it. Airflow handle everything else (initializing constructor, rendering templates, executing, retiring etc..) in essence Airflow handles the whole lifecycle of the operator/task.
   
   When you call operator inside PythonOperator Airflow doesn't handle anything for you. Airflow consider it to be just another Python class thus you need to handle everything on your own. Like you called specifically to `execute()` in order to run the operator you need also call explicitly to a function that will render your template fields, moreover you need explicitly call a function that will read the template extension before that.  
   
   For example this is how I execute a CheckOperator inside a python callable:
   
   ```
   def data_quality_check(**kwargs):
       """
       This callable uses the CheckOperator to conduct data quality tests over the
       final table. If the test result is unsuccessful, a Slack message will
       be posted.
       """
       params = (kwargs['params'])
   
       is_pk_distinct = CheckOperator(
           task_id='data_quality_check_is_pk_distinct_task',
           sql='{}/quality_check_is_pk_distinct.sql'.format(sql_folder),
           conn_id='presto_default',
       )
       try:
           is_pk_distinct.resolve_template_files()
           is_pk_distinct.sql = is_pk_distinct.render_template(content=is_pk_distinct.sql, context=params)
           is_pk_distinct.pre_execute(context=kwargs)
           is_pk_distinct.execute(context=kwargs)
   
       except AirflowException:
           task_instance = kwargs['task_instance']
           params = dict(log_url=task_instance.log_url,
                         dag_id=task_instance.dag_id
                         )
   
           slack = SlackAPIPostOperator(
               task_id='slack_data_quality_alert',
               text="Data quality check has failed.\nDag: %(dag_id)s\n"
                    "Test: Primary key uniqueness\n"
                    "Full log in %(log_url)s" % params,
               slack_conn_id='slack_dags_ci',
               channel=get_slack_channel(SLACK_CHANNEL_TYPE)
           )
   
           return slack.execute(context=kwargs)
           pass
   ```
   
   This is not the nicest code but it gets the job done you need to do something similar in your code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org