You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/27 11:05:48 UTC

[GitHub] [airflow] Robstaa opened a new issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Robstaa opened a new issue #15547:
URL: https://github.com/apache/airflow/issues/15547


   **Apache Airflow version**:
   2.0.1
   
   **Environment**:
   - **OS** (e.g. from /etc/os-release): macOS Big Sur 11.2.3
   - **Kernel** (e.g. `uname -a`): Darwin Kernel Version 20.3.0
   
   **What happened**:
   
   When executing a `PostgresOperator` within a `PythonOperator` through the new Taskflow API, the jinja templating does not seem to execute. As usual, I am giving the `PostgresOperator` a path to the SQL file as `sql` parameter. As standalone task the `PostgresOperator` does what is expected. It reads the SQL path and then executes the SQL code it reads from the file. It also works with `params`.
   
   **What you expected to happen**:
   I expect that this (Jinja-)templating also works when the PostgresOperator is executed within a `@task` method. 
   
   **How to reproduce it**:
   1. Define a connection `"pg_connection"` to a postgres database, or use the default connection.
   2. Within that database create a table `test_table`
   3. In `dags/test_dag.py`:
   ```python
   from airflow.decorators import dag, task
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   from airflow.operators.python import get_current_context
   from airflow.utils.dates import days_ago
   
   
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['admin@airflow.com'],
       'email_on_failure': True,
       'email_on_retry': False,
       'retries': 1,
       'start_date': days_ago(2)
   }
   
   @dag(default_args=DEFAULT_ARGS, schedule_interval=None)
   def test_dag():
   
       outside_pg = PostgresOperator(
           task_id='outside_pg',
           postgres_conn_id='pg_connection',
           sql='sql/test_sql.sql'
       )
   
       @task()
       def inside_pg():
           context = get_current_context()
           inside_pg = PostgresOperator(
               task_id='inside_pg',
               postgres_conn_id='pg_connection',
               sql='sql/test_sql.sql'
           )
           inside_pg.execute(context=context)
   
       execute_inside_pg = inside_pg()
       outside_pg >> execute_inside_pg
   
   dag = test_dag()
   ```
   4. In `dags/sql/test_sql.sql`:
   ```sql
   SELECT * FROM test_table;
   ```
   
   **The error logs I get**:
   ```
   airflow dags test test_dag -1
   [2021-04-27 13:01:25,488] {dagbag.py:448} INFO - Filling up the DagBag from /Users/robinzuschke/code/valyria/airflow/dags
   [2021-04-27 13:01:25,649] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_dag.outside_pg 2021-04-01 00:00:00+00:00 [queued]>']
   [2021-04-27 13:01:30,669] {taskinstance.py:1257} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_EMAIL=admin@airflow.com
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=test_dag
   AIRFLOW_CTX_TASK_ID=outside_pg
   AIRFLOW_CTX_EXECUTION_DATE=2021-04-01T00:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-04-01T00:00:00+00:00
   [2021-04-27 13:01:30,675] {base.py:74} INFO - Using connection to: id: pg_connection. Host: localhost, Port: 5432, Schema: postgres, Login: postgres, Password: None, extra: None
   [2021-04-27 13:01:30,682] {dbapi.py:180} INFO - Running statement: SELECT * FROM test_table;, parameters: None
   [2021-04-27 13:01:30,686] {dbapi.py:186} INFO - Rows affected: 0
   [2021-04-27 13:01:30,690] {taskinstance.py:1166} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=outside_pg, execution_date=20210401T000000, start_date=20210427T104518, end_date=20210427T110130
   [2021-04-27 13:01:30,704] {taskinstance.py:1220} INFO - 0 downstream tasks scheduled from follow-on schedule check
   [2021-04-27 13:01:30,722] {backfill_job.py:388} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
   [2021-04-27 13:01:30,744] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: test_dag.inside_pg 2021-04-01 00:00:00+00:00 [queued]>']
   [2021-04-27 13:01:35,615] {taskinstance.py:1257} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_EMAIL=admin@airflow.com
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=test_dag
   AIRFLOW_CTX_TASK_ID=inside_pg
   AIRFLOW_CTX_EXECUTION_DATE=2021-04-01T00:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-04-01T00:00:00+00:00
   [2021-04-27 13:01:35,619] {base.py:74} INFO - Using connection to: id: pg_connection. Host: localhost, Port: 5432, Schema: postgres, Login: postgres, Password: None, extra: None
   [2021-04-27 13:01:35,624] {dbapi.py:180} INFO - Running statement: sql/test_sql.sql, parameters: None
   [2021-04-27 13:01:35,625] {taskinstance.py:1455} ERROR - syntax error at or near "sql"
   LINE 1: sql/test_sql.sql
           ^
   
   Traceback (most recent call last):
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
       result = task_copy.execute(context=context)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/operators/python.py", line 233, in execute
       return_value = self.python_callable(*self.op_args, **self.op_kwargs)
     File "/Users/robinzuschke/code/valyria/airflow/dags/test_dag.py", line 34, in inside_pg
       inside_pg.execute(context=context)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/providers/postgres/operators/postgres.py", line 71, in execute
       self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 184, in run
       cur.execute(sql_statement)
   psycopg2.errors.SyntaxError: syntax error at or near "sql"
   LINE 1: sql/test_sql.sql
           ^
   
   [2021-04-27 13:01:35,630] {taskinstance.py:1503} INFO - Marking task as UP_FOR_RETRY. dag_id=test_dag, task_id=inside_pg, execution_date=20210401T000000, start_date=20210427T104518, end_date=20210427T110135
   [2021-04-27 13:01:35,643] {debug_executor.py:87} ERROR - Failed to execute task: syntax error at or near "sql"
   LINE 1: sql/test_sql.sql
           ^
   .
   Traceback (most recent call last):
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task
       ti._run_raw_task(job_id=ti.job_id, **params)  # pylint: disable=protected-access
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
       result = task_copy.execute(context=context)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/operators/python.py", line 233, in execute
       return_value = self.python_callable(*self.op_args, **self.op_kwargs)
     File "/Users/robinzuschke/code/valyria/airflow/dags/test_dag.py", line 34, in inside_pg
       inside_pg.execute(context=context)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/providers/postgres/operators/postgres.py", line 71, in execute
       self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
     File "/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 184, in run
       cur.execute(sql_statement)
   psycopg2.errors.SyntaxError: syntax error at or near "sql"
   LINE 1: sql/test_sql.sql
           ^
   
   [2021-04-27 13:01:35,651] {backfill_job.py:219} ERROR - Task instance <TaskInstance: test_dag.inside_pg 2021-04-01 00:00:00+00:00 [failed]> failed
   [2021-04-27 13:01:35,655] {dagrun.py:430} ERROR - Marking run <DagRun test_dag @ 2021-04-01 00:00:00+00:00: backfill__2021-04-01T00:00:00+00:00, externally triggered: False> failed
   [2021-04-27 13:01:35,657] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 | running: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
   Some task instances failed:
   DAG ID    Task ID    Execution date               Try number
   --------  ---------  -------------------------  ------------
   test_dag  inside_pg  2021-04-01 00:00:00+00:00             1
   (3.7.7/envs/airflow)
   ```
   Any help is appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-831087945


   I wrote this as an explanation why operators are not being templated inside PythonOperator. I agree completely that the prefered way is to use hooks (maybe also custom operator). This piece of code is old :)
   
   By the way what is the action item on this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-828666111


   When you execute operator you just need to set it. Airflow handle everything else (initializing constructor, rendering templates, executing, retiring etc..) in essence Airflow handles the whole lifecycle of the operator/task.
   
   When you call operator inside PythonOperator Airflow doesn't handle anything for you. Airflow consider it to be just another Python class thus you need to handle everything on your own. Like you called specifically to `execute()` in order to run the operator you need also call explicitly to a function that will render your template fields, moreover you need explicitly call a function that will read the template extension before that.  
   
   For example this is how I execute a CheckOperator inside a python callable:
   
   ```
   def data_quality_check(**kwargs):
       """
       This callable uses the CheckOperator to conduct data quality tests over the
       final table. If the test result is unsuccessful, a Slack message will
       be posted.
       """
       params = (kwargs['params'])
   
       is_pk_distinct = CheckOperator(
           task_id='data_quality_check_is_pk_distinct_task',
           sql='{}/quality_check_is_pk_distinct.sql'.format(sql_folder),
           conn_id='presto_default',
       )
       try:
           is_pk_distinct.resolve_template_files()
           is_pk_distinct.sql = is_pk_distinct.render_template(content=is_pk_distinct.sql, context=params)
           is_pk_distinct.pre_execute(context=kwargs)
           is_pk_distinct.execute(context=kwargs)
   
       except AirflowException:
           task_instance = kwargs['task_instance']
           params = dict(log_url=task_instance.log_url,
                         dag_id=task_instance.dag_id
                         )
   
           slack = SlackAPIPostOperator(
               task_id='slack_data_quality_alert',
               text="Data quality check has failed.\nDag: %(dag_id)s\n"
                    "Test: Primary key uniqueness\n"
                    "Full log in %(log_url)s" % params,
               slack_conn_id='slack_dags_ci',
               channel=get_slack_channel(SLACK_CHANNEL_TYPE)
           )
   
           return slack.execute(context=kwargs)
           pass
   ```
   
   This is not the nicest code but it gets the job done you need to do something similar in your code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Robstaa edited a comment on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
Robstaa edited a comment on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-828281996


   In my orignial codebase (not the example to reproduce it) one TaskFlow is returning a list of dictionaries. I am giving this list to another Taskflow (the one with the PostgresOperator). This task is looping over each dictionary, manipulating it, and giving it to the PostgresOperator to save it in the database.
   
   I am quite new to Airflow, do you have a better approach? But from what I take from your answer, using an Operator within a Taskflow is not considered best practice. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-827522419


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal closed issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
eladkal closed issue #15547:
URL: https://github.com/apache/airflow/issues/15547


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang edited a comment on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
xinbinhuang edited a comment on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-830841726


   Though the approach suggested by @eladkal works, I would recommend using hooks directly as suggested by @uranusjr. Operators represent a single task in the airflow DAG graph (with other things/metadata specific to airflow), and it's using hooks to do the actual work. If you are finding something that can help "do the work", you should use hooks directly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang edited a comment on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
xinbinhuang edited a comment on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-828166742


   I don't think mixing an operator inside another operator (i.e. PythonOperator/TaskFlow) is supported or will be supported.. What do you try to achieve?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-830841726


   Though the approach suggested by @eladkal works, I would recommend using hooks directly as suggested by @uranusjr. Operators represent a single task in the airflow DAG graph (with other things/metadata specific to airflow), and it's using hooks to do the actual work. If you are finding something that can help do the "work", you should use hooks directly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Robstaa commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
Robstaa commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-828281996


   One TaskFlow is returning a list of dictionaries. I am giving this list to another task (the one with the PostgresOperator). This task is looping over each dictionary, manipulating it, and giving it to the PostgresOperator to save it in the database.
   
   I am quite new to Airflow, do you have a better approach? But from what I take from your answer, using an Operator within a Taskflow is not considered best practice. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-828166742


   I don't think mixing a Operator inside the task flow API (PythonOperator) is supported or will be supported.. What do you try to achieve?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-830843697


   Yep. Using hook is the way to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-828285461


   So the operator inside the task is only used to directly access the database *while the task is running*, and not scheduled to a DAG? You probably want `PostgresHook` instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Robstaa commented on issue #15547: Jinja templating throws bug when using a PostgresOperator within the Taskflow API

Posted by GitBox <gi...@apache.org>.
Robstaa commented on issue #15547:
URL: https://github.com/apache/airflow/issues/15547#issuecomment-830796404


   That makes sense, thank you so much! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org