You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/21 22:16:40 UTC
[GitHub] [airflow] bobhaffner opened a new issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
bobhaffner opened a new issue #13823:
URL: https://github.com/apache/airflow/issues/13823
**Apache Airflow version**:
2.1.0.dev0
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
NA
**Environment**:
Docker
Apache/Airflow
master-python3.6
**What happened**:
some_pg_task it out of sequence
**What you expected to happen**:\
Task sequence should be
some_py_task >> some_pg_task
<!-- What do you think went wrong? -->
**How to reproduce it**:
```
from airflow.models import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
import os
from airflow.utils.dates import days_ago
default_args = {'start_date': days_ago(1)}
dag_name = os.path.splitext(os.path.basename(__file__))[0]
with DAG(dag_name, default_args=default_args) as dag:
@dag.task
def some_py_task() -> str:
return some_str
some_str = some_py_task()
some_pg_task = PostgresOperator(task_id='some_pg_task',
sql='select * from %s',
parameters=[some_str],
postgres_conn_id='postgres_bob')
```
**Anything else we need to know**:
The MSSQL Op behaves the same way
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] vemikhaylov removed a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
vemikhaylov removed a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731
Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy closed issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed issue #13823:
URL: https://github.com/apache/airflow/issues/13823
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] vemikhaylov removed a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
vemikhaylov removed a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731
Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] resdevd edited a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
resdevd edited a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144
@ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator. Tested with Airflow version 2.2.0.
<img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
This is the dag file where PostgresOperator seems out of the dependencies flow. Any advice on workaround?
```import json
from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
@dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
def tutorial_taskflow_api_etl():
test_pg_operator = PostgresOperator(
task_id="test_pg_operator",
postgres_conn_id="rd_master_pg",
sql="SELECT 1;"
)
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
#
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load_task_flow = load(order_summary["total_order_value"])
test_pg_operator >> load_task_flow
tutorial_etl_dag = tutorial_taskflow_api_etl()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] vemikhaylov commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
vemikhaylov commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731
Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
ashb commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-808352865
I think this is worse than not sequencing right, but if if the sequence was explicitly set, the parameters might not be what you expect (i.e. the output of the py task.). _Might_.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] resdevd edited a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
resdevd edited a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144
@ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator.
<img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
This is the dag file where PostgresOperator seems out of the dependencies flow.
```import json
from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
@dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
def tutorial_taskflow_api_etl():
test_pg_operator = PostgresOperator(
task_id="test_pg_operator",
postgres_conn_id="rd_master_pg",
sql="SELECT 1;"
)
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
#
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load_task_flow = load(order_summary["total_order_value"])
test_pg_operator >> load_task_flow
tutorial_etl_dag = tutorial_taskflow_api_etl()```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-764976508
Thanks for opening your first issue here! Be sure to follow the issue template!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] resdevd edited a comment on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
resdevd edited a comment on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144
@ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator.
<img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
This is the dag file where PostgresOperator seems out of the dependencies flow.
```import json
from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
@dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
def tutorial_taskflow_api_etl():
test_pg_operator = PostgresOperator(
task_id="test_pg_operator",
postgres_conn_id="rd_master_pg",
sql="SELECT 1;"
)
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
#
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load_task_flow = load(order_summary["total_order_value"])
test_pg_operator >> load_task_flow
tutorial_etl_dag = tutorial_taskflow_api_etl()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-764976508
Thanks for opening your first issue here! Be sure to follow the issue template!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] resdevd commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
resdevd commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940466144
@ashb Not sure if the dependencies are working even with specifying ">>". Below is a sample Taskflow dag with PostgresOperator.
```import json
from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
@dag(schedule_interval=None, start_date=datetime(2021, 10, 11), catchup=False, tags=['example'])
def tutorial_taskflow_api_etl():
test_pg_operator = PostgresOperator(
task_id="test_pg_operator",
postgres_conn_id="rd_master_pg",
sql="SELECT 1;"
)
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
#
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load_task_flow = load(order_summary["total_order_value"])
test_pg_operator >> load_task_flow
tutorial_etl_dag = tutorial_taskflow_api_etl()```
And still PostgresOperator seems out of the dependencies flow.
<img width="300" alt="Screen Shot 2021-10-11 at 1 43 19 PM" src="https://user-images.githubusercontent.com/29984943/136859434-8ea02532-6730-4c6f-88b4-e879b842b8cf.png">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940527332
cc: @alex-astronomer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-940513910
This is what you need: `test_pg_operator >> order_data >> order_summary >> load_task_flow`
There's a difference between passing xcoms and creating dependency.
We are working on documentation to clarify that. See : https://github.com/apache/airflow/issues/17686#issuecomment-937599461
Closing this for now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] vemikhaylov commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
vemikhaylov commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-766552731
Do you specify the relation `some_str >> some_pg_task` (https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#relations-between-tasks) in the DAG? This isn't in your example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on issue #13823: Postgres Operator task doesn't sequence properly with TaskFlow API
Posted by GitBox <gi...@apache.org>.
ashb commented on issue #13823:
URL: https://github.com/apache/airflow/issues/13823#issuecomment-809440801
Fixing this properly so it actually does what you want (set the dep between tasks _and_ passes the value in the parameter is more than a bug fix, so I'm pushing this to 2.1.
In the mean time, the way to specify the dependencies is this:
```python
some_str = some_py_task()
some_pg_task = PostgresOperator(task_id='some_pg_task',
sql='select * from %s',
parameters=[some_str],
postgres_conn_id='postgres_bob')
some_str >> some_pg_task
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org