You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/21 22:16:40 UTC

[GitHub] [airflow] bobhaffner opened a new issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

bobhaffner opened a new issue #13823:
URL: https://github.com/apache/airflow/issues/13823


   **Apache Airflow version**:
   2.1.0.dev0
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   NA
   
   **Environment**:
   Docker
   Apache/Airflow
   master-python3.6
   
   **What happened**:
   some_pg_task it out of sequence
   
   **What you expected to happen**:\
   Task sequence should be 
   some_py_task >> some_pg_task
   
   <!-- What do you think went wrong? -->
   
   
   **How to reproduce it**:
   ```
   from airflow.models import DAG
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   import os
   
   from airflow.utils.dates import days_ago
   
   default_args = {'start_date': days_ago(1)}
   
   dag_name = os.path.splitext(os.path.basename(__file__))[0]
   
   with DAG(dag_name, default_args=default_args) as dag:
   
       @dag.task
       def some_py_task() -> str:
           return some_str
   
       some_str = some_py_task()
   
       some_pg_task = PostgresOperator(task_id='some_pg_task', 
                        sql='select * from %s',
                        parameters=[some_str],
                        postgres_conn_id='postgres_bob')
   ```
   
   **Anything else we need to know**:
   The MSSQL Op behaves the same way
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vemikhaylov removed a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
vemikhaylov removed a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731


   Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed issue #13823:
URL: https://github.com/apache/airflow/issues/13823


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vemikhaylov removed a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
vemikhaylov removed a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731


   Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] resdevd edited a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
resdevd edited a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144


   @ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator. Tested with Airflow version 2.2.0.
   
   <img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
   
   This is the dag file where PostgresOperator seems out of the dependencies flow. Any advice on workaround?
   
   ```import json
   from datetime import datetime
   from airflow.decorators import dag, task
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   
   
   @dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
   def tutorial_taskflow_api_etl():
       test_pg_operator = PostgresOperator(
           task_id="test_pg_operator",
           postgres_conn_id="rd_master_pg",
           sql="SELECT 1;"
       )
   
       @task()
       def extract():
           data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
   
           order_data_dict = json.loads(data_string)
           return order_data_dict
   
       @task(multiple_outputs=True)
       def transform(order_data_dict: dict):
           total_order_value = 0
   
           for value in order_data_dict.values():
               total_order_value += value
   
           return {"total_order_value": total_order_value}
   
       #
       @task()
       def load(total_order_value: float):
           print(f"Total order value is: {total_order_value:.2f}")
   
       order_data = extract()
       order_summary = transform(order_data)
       load_task_flow = load(order_summary["total_order_value"])
       test_pg_operator >> load_task_flow
   
   
   tutorial_etl_dag = tutorial_taskflow_api_etl()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vemikhaylov commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
vemikhaylov commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731


   Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-808352865


   I think this is worse than not sequencing right, but if if the sequence was explicitly set, the parameters might not be what you expect (i.e. the output of the py task.). _Might_.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] resdevd edited a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
resdevd edited a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144


   @ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator.
   
   
   
   <img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
   
   This is the dag file where PostgresOperator seems out of the dependencies flow.
   
   ```import json
   from datetime import datetime
   from airflow.decorators import dag, task
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   
   
   @dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
   def tutorial_taskflow_api_etl():
       test_pg_operator = PostgresOperator(
           task_id="test_pg_operator",
           postgres_conn_id="rd_master_pg",
           sql="SELECT 1;"
       )
   
       @task()
       def extract():
           data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
   
           order_data_dict = json.loads(data_string)
           return order_data_dict
   
       @task(multiple_outputs=True)
       def transform(order_data_dict: dict):
           total_order_value = 0
   
           for value in order_data_dict.values():
               total_order_value += value
   
           return {"total_order_value": total_order_value}
   
       #
       @task()
       def load(total_order_value: float):
           print(f"Total order value is: {total_order_value:.2f}")
   
       order_data = extract()
       order_summary = transform(order_data)
       load_task_flow = load(order_summary["total_order_value"])
       test_pg_operator >> load_task_flow
   
   
   tutorial_etl_dag = tutorial_taskflow_api_etl()```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-764976508


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] resdevd edited a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
resdevd edited a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144


   @ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator.
   
   
   
   <img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
   
   This is the dag file where PostgresOperator seems out of the dependencies flow.
   
   ```import json
   from datetime import datetime
   from airflow.decorators import dag, task
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   
   
   @dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
   def tutorial_taskflow_api_etl():
       test_pg_operator = PostgresOperator(
           task_id="test_pg_operator",
           postgres_conn_id="rd_master_pg",
           sql="SELECT 1;"
       )
   
       @task()
       def extract():
           data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
   
           order_data_dict = json.loads(data_string)
           return order_data_dict
   
       @task(multiple_outputs=True)
       def transform(order_data_dict: dict):
           total_order_value = 0
   
           for value in order_data_dict.values():
               total_order_value += value
   
           return {"total_order_value": total_order_value}
   
       #
       @task()
       def load(total_order_value: float):
           print(f"Total order value is: {total_order_value:.2f}")
   
       order_data = extract()
       order_summary = transform(order_data)
       load_task_flow = load(order_summary["total_order_value"])
       test_pg_operator >> load_task_flow
   
   
   tutorial_etl_dag = tutorial_taskflow_api_etl()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-764976508


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] resdevd commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
resdevd commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144


   @ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator.
   
   ```import json
   from datetime import datetime
   from airflow.decorators import dag, task
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   
   
   @dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
   def tutorial_taskflow_api_etl():
       test_pg_operator = PostgresOperator(
           task_id="test_pg_operator",
           postgres_conn_id="rd_master_pg",
           sql="SELECT 1;"
       )
   
       @task()
       def extract():
           data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
   
           order_data_dict = json.loads(data_string)
           return order_data_dict
   
       @task(multiple_outputs=True)
       def transform(order_data_dict: dict):
           total_order_value = 0
   
           for value in order_data_dict.values():
               total_order_value += value
   
           return {"total_order_value": total_order_value}
   
       #
       @task()
       def load(total_order_value: float):
           print(f"Total order value is: {total_order_value:.2f}")
   
       order_data = extract()
       order_summary = transform(order_data)
       load_task_flow = load(order_summary["total_order_value"])
       test_pg_operator >> load_task_flow
   
   
   tutorial_etl_dag = tutorial_taskflow_api_etl()```
   
   And still PostgresOperator seems out of the dependencies flow.
   
   
   
   <img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940527332


   cc: @alex-astronomer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940513910


   This is what you need: `test_pg_operator >> order_data >> order_summary >> load_task_flow`
   There's a difference between passing xcoms and creating dependency. 
   We are working on documentation to clarify that. See : https://github.com/apache/airflow/issues/17686#issuecomment-937599461
   
   Closing this for now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vemikhaylov commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
vemikhaylov commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731


   Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-809440801


   Fixing this properly so it actually does what you want (set the dep between tasks _and_ passes the value in the parameter is more than a bug fix, so I'm pushing this to 2.1.
   
   In the mean time, the way to specify the dependencies is this:
   
   ```python
   
       some_str = some_py_task()
   
       some_pg_task = PostgresOperator(task_id='some_pg_task', 
                        sql='select * from %s',
                        parameters=[some_str],
                        postgres_conn_id='postgres_bob')
   
       some_str >> some_pg_task
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org